dcapwell commented on code in PR #17:
URL: https://github.com/apache/cassandra-accord/pull/17#discussion_r1055860712


##########
accord-core/src/main/java/accord/utils/ThreadPoolScheduler.java:
##########
@@ -96,7 +95,7 @@ public void stop()
         }
         catch (InterruptedException e)
         {
-            throw new UncheckedInterruptedException(e);
+            throw new RuntimeException(e);

Review Comment:
   this should have a negative side effect with Simulator... we check that 
exceptions threw `UncheckedInterruptedException` and act differently, so this 
would change behavior there.



##########
accord-core/src/main/java/accord/local/Node.java:
##########
@@ -197,12 +202,12 @@ public void withEpoch(long epoch, Runnable runnable)
         else
         {
             configService.fetchTopologyForEpoch(epoch);
-            topology.awaitEpoch(epoch).addListener(runnable);
+            topology.awaitEpoch(epoch).addCallback(runnable);
         }
     }
 
     @Inline
-    public <T> Future<T> withEpoch(long epoch, Supplier<Future<T>> supplier)
+    public <T> AsyncResult<T> withEpoch(long epoch, Supplier<AsyncResult<T>> 
supplier)

Review Comment:
   2/3 usage store into a cache of some sort, but given the bias towards 
`AsyncChain` I think we should work with chain instead.  Assuming we include 
Benedicts change that `AsyncResult` implements `AsyncChain` then we can change 
to
   
   ```
   public <T> AsyncChain<T> withEpoch(long epoch, Supplier<? extends 
AsyncChain<T>> supplier)
       {
           if (topology.hasEpoch(epoch))
           {
               return supplier.get();
           }
           else
           {
               configService.fetchTopologyForEpoch(epoch);
               return topology.awaitEpoch(epoch).toChain().flatMap(ignore -> 
supplier.get());
           }
       }
   ```
   
   The side effect of this is that the callers get changed to call 
`beginAsResult`, but makes this not bias towards completing the chain in the 
middle.



##########
accord-core/src/main/java/accord/utils/async/AsyncChainCombiner.java:
##########
@@ -0,0 +1,150 @@
+package accord.utils.async;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+
+abstract class AsyncChainCombiner<I, O> extends AsyncChains.Head<O>
+{
+    private static final AtomicIntegerFieldUpdater<AsyncChainCombiner> 
REMAINING = AtomicIntegerFieldUpdater.newUpdater(AsyncChainCombiner.class, 
"remaining");
+    private volatile Object state;
+    private volatile BiConsumer<? super O, Throwable> callback;
+    private volatile int remaining;
+
+    protected AsyncChainCombiner(List<AsyncChain<I>> inputs)
+    {
+        Preconditions.checkArgument(!inputs.isEmpty());
+        this.state = inputs;
+    }
+
+    private List<AsyncChain<? extends I>> inputs()
+    {
+        Object current = state;
+        Preconditions.checkState(current instanceof List);

Review Comment:
   nit: when this isn't true its great to know what the class is, so its easier 
to debug what happens



##########
accord-core/src/main/java/accord/utils/async/AsyncChainCombiner.java:
##########
@@ -0,0 +1,150 @@
+package accord.utils.async;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+
+abstract class AsyncChainCombiner<I, O> extends AsyncChains.Head<O>
+{
+    private static final AtomicIntegerFieldUpdater<AsyncChainCombiner> 
REMAINING = AtomicIntegerFieldUpdater.newUpdater(AsyncChainCombiner.class, 
"remaining");
+    private volatile Object state;
+    private volatile BiConsumer<? super O, Throwable> callback;
+    private volatile int remaining;
+
+    protected AsyncChainCombiner(List<AsyncChain<I>> inputs)
+    {
+        Preconditions.checkArgument(!inputs.isEmpty());
+        this.state = inputs;
+    }
+
+    private List<AsyncChain<? extends I>> inputs()
+    {
+        Object current = state;
+        Preconditions.checkState(current instanceof List);
+        return (List<AsyncChain<? extends I>>) current;
+    }
+
+    private I[] results()
+    {
+        Object current = state;
+        Preconditions.checkState(current instanceof Object[]);
+        return (I[]) current;
+    }
+
+    void add(AsyncChain<I> chain)
+    {
+        inputs().add(chain);
+    }
+
+    void addAll(List<AsyncChain<I>> chains)
+    {
+        inputs().addAll(chains);
+    }
+
+    int size()
+    {
+        Object current = state;
+        if (current instanceof List)
+            return ((List) current).size();
+        if (current instanceof Object[])
+            return ((Object[]) current).length;
+        throw new IllegalStateException();
+    }
+
+    abstract void complete(I[] results, BiConsumer<? super O, Throwable> 
callback);
+
+    private void callback(int idx, I result, Throwable throwable)
+    {
+        int current = remaining;
+        if (current == 0)
+            return;
+
+        if (throwable != null && REMAINING.compareAndSet(this, current, 0))
+        {
+            callback.accept(null, throwable);
+            return;
+        }
+
+        results()[idx] = result;
+        if (REMAINING.decrementAndGet(this) == 0)
+        {
+            try
+            {
+                complete(results(), callback);
+            }
+            catch (Throwable t)
+            {
+                callback.accept(null, t);
+            }
+        }
+    }
+
+    private BiConsumer<I, Throwable> callbackFor(int idx)
+    {
+        return (result, failure) -> callback(idx, result, failure);
+    }
+
+    @Override
+    public void begin(BiConsumer<? super O, Throwable> callback)
+    {
+        List<AsyncChain<? extends I>> chains = inputs();
+        state = new Object[chains.size()];
+
+        int size = chains.size();
+        this.callback = callback;
+        this.remaining = size;
+        for (int i=0; i<size; i++)
+            chains.get(i).begin(callbackFor(i));
+    }
+
+    static class All<V> extends AsyncChainCombiner<V, List<V>>
+    {
+        All(List<AsyncChain<V>> asyncChains)
+        {
+            super(asyncChains);
+        }
+
+        @Override
+        void complete(V[] results, BiConsumer<? super List<V>, Throwable> 
callback)
+        {
+            List<V> result = Lists.newArrayList(results);

Review Comment:
   not in j8, can we use `Arrays.asList` or wrap?  
   
   see https://docs.oracle.com/javase/8/docs/api/java/util/List.html



##########
accord-core/src/main/java/accord/utils/async/AsyncChainCombiner.java:
##########
@@ -0,0 +1,150 @@
+package accord.utils.async;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+
+abstract class AsyncChainCombiner<I, O> extends AsyncChains.Head<O>
+{
+    private static final AtomicIntegerFieldUpdater<AsyncChainCombiner> 
REMAINING = AtomicIntegerFieldUpdater.newUpdater(AsyncChainCombiner.class, 
"remaining");
+    private volatile Object state;
+    private volatile BiConsumer<? super O, Throwable> callback;
+    private volatile int remaining;
+
+    protected AsyncChainCombiner(List<AsyncChain<I>> inputs)
+    {
+        Preconditions.checkArgument(!inputs.isEmpty());
+        this.state = inputs;
+    }
+
+    private List<AsyncChain<? extends I>> inputs()
+    {
+        Object current = state;
+        Preconditions.checkState(current instanceof List);
+        return (List<AsyncChain<? extends I>>) current;
+    }
+
+    private I[] results()
+    {
+        Object current = state;
+        Preconditions.checkState(current instanceof Object[]);
+        return (I[]) current;
+    }
+
+    void add(AsyncChain<I> chain)
+    {
+        inputs().add(chain);
+    }
+
+    void addAll(List<AsyncChain<I>> chains)
+    {
+        inputs().addAll(chains);
+    }
+
+    int size()
+    {
+        Object current = state;
+        if (current instanceof List)
+            return ((List) current).size();
+        if (current instanceof Object[])
+            return ((Object[]) current).length;
+        throw new IllegalStateException();
+    }
+
+    abstract void complete(I[] results, BiConsumer<? super O, Throwable> 
callback);
+
+    private void callback(int idx, I result, Throwable throwable)
+    {
+        int current = remaining;
+        if (current == 0)
+            return;
+
+        if (throwable != null && REMAINING.compareAndSet(this, current, 0))

Review Comment:
   shouldn't this be a CAS loop?  The following timing looks to do the wrong 
thing
   
   ```
   T1: callback(?, null, NullPointerException), current = 4
   T2: callback(?, "success", null), current = 4
   T2: update results, `REMAINING.decrementAndGet(this)`
   T1: throwable != null == true, REMAINING.compareAndSet(this, current, 0) == 
false
   T1: updates results with a null <- unexpected behavior
   ```



##########
accord-core/src/main/java/accord/utils/async/AsyncChains.java:
##########
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.utils.async;
+
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import static accord.utils.async.AsyncChainCombiner.Reduce;
+
+public abstract class AsyncChains<V> implements AsyncChain<V>
+{
+    static class Immediate<V> implements AsyncChain<V>
+    {
+        static class FailureHolder
+        {
+            final Throwable cause;
+            FailureHolder(Throwable cause)
+            {
+                this.cause = cause;
+            }
+        }
+
+        final private Object value;
+        private Immediate(V success) { this.value = success; }
+        private Immediate(Throwable failure) { this.value = new 
FailureHolder(failure); }
+
+        @Override
+        public <T> AsyncChain<T> map(Function<? super V, ? extends T> mapper)
+        {
+            if (value != null && value.getClass() == FailureHolder.class)
+                return (AsyncChain<T>) this;
+            return new Immediate<>(mapper.apply((V) value));
+        }
+
+        @Override
+        public <T> AsyncChain<T> flatMap(Function<? super V, ? extends 
AsyncChain<T>> mapper)
+        {
+            if (value != null && value.getClass() == FailureHolder.class)
+                return (AsyncChain<T>) this;
+            return mapper.apply((V) value);
+        }
+
+        @Override
+        public AsyncChain<V> addCallback(BiConsumer<? super V, Throwable> 
callback)
+        {
+            if (value == null || value.getClass() != FailureHolder.class)
+                callback.accept((V) value, null);
+            else
+                callback.accept(null, ((FailureHolder)value).cause);
+            return this;
+        }
+
+        @Override
+        public void begin(BiConsumer<? super V, Throwable> callback)
+        {
+            addCallback(callback);
+        }
+    }
+
+    public abstract static class Head<V> extends AsyncChains<V> implements 
BiConsumer<V, Throwable>
+    {
+        protected Head()
+        {
+            super(null);
+            next = this;
+        }
+
+        void begin()
+        {
+            begin(next);
+        }
+
+        @Override
+        public void accept(V v, Throwable throwable)
+        {
+            // we implement here just to simplify logic a little
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    static abstract class Link<I, O> extends AsyncChains<O> implements 
BiConsumer<I, Throwable>
+    {
+        protected Link(Head<?> head)
+        {
+            super(head);
+        }
+
+        @Override
+        public void begin(BiConsumer<? super O, Throwable> callback)
+        {
+            Preconditions.checkArgument(!(callback instanceof 
AsyncChains.Head));
+            Preconditions.checkState(next instanceof AsyncChains.Head);

Review Comment:
   nit: would be great to have useful error messages



##########
accord-core/src/main/java/accord/utils/async/AsyncResults.java:
##########
@@ -0,0 +1,309 @@
+package accord.utils.async;
+
+import com.google.common.base.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+
+public class AsyncResults
+{
+    private AsyncResults() {}
+
+    private static class Result<V>
+    {
+        final V value;
+        final Throwable failure;
+
+        public Result(V value, Throwable failure)
+        {
+            this.value = value;
+            this.failure = failure;
+        }
+    }
+
+    static class AbstractResult<V> implements AsyncResult<V>
+    {
+        private static final AtomicReferenceFieldUpdater<AbstractResult, 
Object> STATE = AtomicReferenceFieldUpdater.newUpdater(AbstractResult.class, 
Object.class, "state");
+
+        private volatile Object state;
+
+        private static class Listener<V>
+        {
+            final BiConsumer<? super V, Throwable> callback;
+            Listener<V> next;
+
+            public Listener(BiConsumer<? super V, Throwable> callback)
+            {
+                this.callback = callback;
+            }
+        }
+
+        private void notify(Listener<V> listener, Result<V> result)
+        {
+            while (listener != null)
+            {
+                listener.callback.accept(result.value, result.failure);
+                listener = listener.next;
+            }
+        }
+
+        boolean trySetResult(Result<V> result)
+        {
+            while (true)
+            {
+                Object current = state;
+                if (current instanceof Result)
+                    return false;
+                Listener<V> listener = (Listener<V>) current;
+                if (STATE.compareAndSet(this, current, result))
+                {
+                    notify(listener, result);
+                    return true;
+                }
+            }
+        }
+
+        boolean trySetResult(V result, Throwable failure)
+        {
+            return trySetResult(new Result<>(result, failure));
+        }
+
+        void setResult(Result<V> result)
+        {
+            if (!trySetResult(result))
+                throw new IllegalStateException("Result has already been set 
on " + this);
+        }
+
+        void setResult(V result, Throwable failure)
+        {
+            if (!trySetResult(result, failure))
+                throw new IllegalStateException("Result has already been set 
on " + this);
+        }
+
+        @Override
+        public void addCallback(BiConsumer<? super V, Throwable> callback)
+        {
+            Listener<V> listener = null;
+            while (true)
+            {
+                Object current = state;
+                if (current instanceof Result)
+                {
+                    Result<V> result = (Result<V>) current;
+                    callback.accept(result.value, result.failure);
+                    return;
+                }
+                if (listener == null)
+                    listener = new Listener<>(callback);
+
+                listener.next = (Listener<V>) current;
+                if (STATE.compareAndSet(this, current, listener))
+                    return;
+            }
+        }
+
+        @Override
+        public boolean isDone()
+        {
+            return state instanceof Result;
+        }
+
+        @Override
+        public boolean isSuccess()
+        {
+            Object current = state;
+            return current instanceof Result && ((Result) current).failure == 
null;
+        }
+    }
+
+    static class Chain<V> extends AbstractResult<V>
+    {
+        public Chain(AsyncChain<V> chain)
+        {
+            chain.begin(this::setResult);
+        }
+    }
+
+    public static class Settable<V> extends AbstractResult<V> implements 
AsyncResult.Settable<V>
+    {
+
+        @Override
+        public boolean trySuccess(V value)
+        {
+            return trySetResult(value, null);
+        }
+
+        @Override
+        public boolean tryFailure(Throwable throwable)
+        {
+            return trySetResult(null, throwable);
+        }
+    }
+
+    static class Immediate<V> implements AsyncResult<V>
+    {
+        private final V value;
+        private final Throwable failure;
+
+        Immediate(V value)
+        {
+            this.value = value;
+            this.failure = null;
+        }
+
+        Immediate(Throwable failure)
+        {
+            this.value = null;
+            this.failure = failure;
+        }
+
+        @Override
+        public void addCallback(BiConsumer<? super V, Throwable> callback)
+        {
+            callback.accept(value, failure);
+        }
+
+        @Override
+        public boolean isDone()
+        {
+            return true;
+        }
+
+        @Override
+        public boolean isSuccess()
+        {
+            return failure == null;
+        }
+    }
+
+    /**
+     * Creates an AsyncResult for the given chain. This calls begin on the 
supplied chain
+     */
+    public static <V> AsyncResult<V> forChain(AsyncChain<V> chain)
+    {
+        return new Chain<>(chain);
+    }
+
+    public static <V> AsyncResult<V> success(V value)
+    {
+        return new Immediate<>(value);
+    }
+
+    public static <V> AsyncResult<V> failure(Throwable failure)
+    {
+        return new Immediate<>(failure);
+    }
+
+    public static <V> AsyncResult<V> ofCallable(Executor executor, Callable<V> 
callable)
+    {
+        Settable<V> result = new Settable<V>();
+        executor.execute(() -> {
+            try
+            {
+                result.trySuccess(callable.call());
+            }
+            catch (Exception e)
+            {
+                result.tryFailure(e);
+            }
+        });
+        return result;
+    }
+
+    public static AsyncResult<Void> ofRunnable(Executor executor, Runnable 
runnable)
+    {
+        Settable<Void> result = new Settable<Void>();
+        executor.execute(() -> {
+            try
+            {
+                runnable.run();
+                result.trySuccess(null);
+            }
+            catch (Exception e)
+            {
+                result.tryFailure(e);
+            }
+        });
+        return result;
+    }
+
+    public static <V> AsyncResult.Settable<V> settable()
+    {
+        return new Settable<>();
+    }
+
+    private static <V> List<AsyncChain<V>> toChains(List<AsyncResult<V>> 
results)
+    {
+        List<AsyncChain<V>> chains = new ArrayList<>(results.size());
+        for (int i=0,mi=results.size(); i<mi; i++)
+            chains.add(results.get(i).toChain());
+        return chains;
+    }
+
+    public static <V> AsyncChain<List<V>> all(List<AsyncResult<V>> results)
+    {
+        Preconditions.checkArgument(!results.isEmpty());
+        return new AsyncChainCombiner.All<>(toChains(results));
+    }
+
+    public static <V> AsyncChain<V> reduce(List<AsyncResult<V>> results, 
BiFunction<V, V, V> reducer)
+    {
+        Preconditions.checkArgument(!results.isEmpty());
+        if (results.size() == 1)
+            return results.get(0).toChain();
+        return new AsyncChainCombiner.Reduce<>(toChains(results), reducer);
+    }
+
+    public static <V> V getBlocking(AsyncResult<V> asyncResult) throws 
InterruptedException, ExecutionException
+    {
+        AtomicReference<Result<V>> callbackResult = new AtomicReference<>();
+        CountDownLatch latch = new CountDownLatch(1);

Review Comment:
   @belliottsmith curious why?  I can look closer at our rewrite logic more 
(such as 
`org.apache.cassandra.simulator.asm.GlobalMethodTransformer#visitMethodInsn`) 
but I feel that if we are rewriting all C*/Accord classes we could always 
intercept `new` and not require the `new*` methods to be called?   I see in 
this case we don't extend java's and instead add a new 
`org.apache.cassandra.utils.concurrent.CountDownLatch` which appears to be 
replaced with a call to 
`org.apache.cassandra.simulator.systems.InterceptingGlobalMethods#newCountDownLatch`
 which returns a 
`org.apache.cassandra.simulator.systems.InterceptingAwaitable.InterceptingCountDownLatch`
   
   Not sure how simulator deals with this, but the base types I see are 
`org.apache.cassandra.simulator.systems.InterceptingAwaitable` and 
`org.apache.cassandra.simulator.systems.InterceptedWait.TriggerListener`, so 
feel we could do the same for the base java concurrent utils.



##########
accord-core/src/main/java/accord/utils/async/AsyncResults.java:
##########
@@ -0,0 +1,309 @@
+package accord.utils.async;
+
+import com.google.common.base.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+
+public class AsyncResults
+{
+    private AsyncResults() {}
+
+    private static class Result<V>
+    {
+        final V value;
+        final Throwable failure;
+
+        public Result(V value, Throwable failure)
+        {
+            this.value = value;
+            this.failure = failure;
+        }
+    }
+
+    static class AbstractResult<V> implements AsyncResult<V>
+    {
+        private static final AtomicReferenceFieldUpdater<AbstractResult, 
Object> STATE = AtomicReferenceFieldUpdater.newUpdater(AbstractResult.class, 
Object.class, "state");
+
+        private volatile Object state;
+
+        private static class Listener<V>
+        {
+            final BiConsumer<? super V, Throwable> callback;
+            Listener<V> next;
+
+            public Listener(BiConsumer<? super V, Throwable> callback)
+            {
+                this.callback = callback;
+            }
+        }
+
+        private void notify(Listener<V> listener, Result<V> result)
+        {
+            while (listener != null)
+            {
+                listener.callback.accept(result.value, result.failure);
+                listener = listener.next;
+            }
+        }
+
+        boolean trySetResult(Result<V> result)
+        {
+            while (true)
+            {
+                Object current = state;
+                if (current instanceof Result)
+                    return false;
+                Listener<V> listener = (Listener<V>) current;
+                if (STATE.compareAndSet(this, current, result))
+                {
+                    notify(listener, result);
+                    return true;
+                }
+            }
+        }
+
+        boolean trySetResult(V result, Throwable failure)
+        {
+            return trySetResult(new Result<>(result, failure));
+        }
+
+        void setResult(Result<V> result)
+        {
+            if (!trySetResult(result))
+                throw new IllegalStateException("Result has already been set 
on " + this);
+        }
+
+        void setResult(V result, Throwable failure)
+        {
+            if (!trySetResult(result, failure))
+                throw new IllegalStateException("Result has already been set 
on " + this);
+        }
+
+        @Override
+        public void addCallback(BiConsumer<? super V, Throwable> callback)
+        {
+            Listener<V> listener = null;
+            while (true)
+            {
+                Object current = state;
+                if (current instanceof Result)
+                {
+                    Result<V> result = (Result<V>) current;
+                    callback.accept(result.value, result.failure);
+                    return;
+                }
+                if (listener == null)
+                    listener = new Listener<>(callback);
+
+                listener.next = (Listener<V>) current;
+                if (STATE.compareAndSet(this, current, listener))
+                    return;
+            }
+        }
+
+        @Override
+        public boolean isDone()
+        {
+            return state instanceof Result;
+        }
+
+        @Override
+        public boolean isSuccess()
+        {
+            Object current = state;
+            return current instanceof Result && ((Result) current).failure == 
null;
+        }
+    }
+
+    static class Chain<V> extends AbstractResult<V>
+    {
+        public Chain(AsyncChain<V> chain)
+        {
+            chain.begin(this::setResult);
+        }
+    }
+
+    public static class Settable<V> extends AbstractResult<V> implements 
AsyncResult.Settable<V>
+    {
+
+        @Override
+        public boolean trySuccess(V value)
+        {
+            return trySetResult(value, null);
+        }
+
+        @Override
+        public boolean tryFailure(Throwable throwable)
+        {
+            return trySetResult(null, throwable);
+        }
+    }
+
+    static class Immediate<V> implements AsyncResult<V>
+    {
+        private final V value;
+        private final Throwable failure;
+
+        Immediate(V value)
+        {
+            this.value = value;
+            this.failure = null;
+        }
+
+        Immediate(Throwable failure)
+        {
+            this.value = null;
+            this.failure = failure;
+        }
+
+        @Override
+        public void addCallback(BiConsumer<? super V, Throwable> callback)
+        {
+            callback.accept(value, failure);
+        }
+
+        @Override
+        public boolean isDone()
+        {
+            return true;
+        }
+
+        @Override
+        public boolean isSuccess()
+        {
+            return failure == null;
+        }
+    }
+
+    /**
+     * Creates an AsyncResult for the given chain. This calls begin on the 
supplied chain
+     */
+    public static <V> AsyncResult<V> forChain(AsyncChain<V> chain)
+    {
+        return new Chain<>(chain);
+    }
+
+    public static <V> AsyncResult<V> success(V value)
+    {
+        return new Immediate<>(value);
+    }
+
+    public static <V> AsyncResult<V> failure(Throwable failure)
+    {
+        return new Immediate<>(failure);
+    }
+
+    public static <V> AsyncResult<V> ofCallable(Executor executor, Callable<V> 
callable)
+    {
+        Settable<V> result = new Settable<V>();
+        executor.execute(() -> {
+            try
+            {
+                result.trySuccess(callable.call());
+            }
+            catch (Exception e)
+            {
+                result.tryFailure(e);
+            }
+        });
+        return result;
+    }
+
+    public static AsyncResult<Void> ofRunnable(Executor executor, Runnable 
runnable)
+    {
+        Settable<Void> result = new Settable<Void>();
+        executor.execute(() -> {
+            try
+            {
+                runnable.run();
+                result.trySuccess(null);
+            }
+            catch (Exception e)
+            {
+                result.tryFailure(e);
+            }
+        });
+        return result;
+    }

Review Comment:
   wondering if we should remove in favor of `AsyncChains`.  This method is 
eager and returns a future of the result, where as chains is lazy and lets you 
keep adding to the chain until you are ready.  Users can use 
`AsyncChains.ofCallable(...).beginAsResult()` to get similar semantics.
   
   This code is also dead code atm, so helps the argument to remove



##########
accord-core/src/main/java/accord/utils/async/AsyncResult.java:
##########
@@ -0,0 +1,86 @@
+package accord.utils.async;
+
+import java.util.concurrent.Executor;
+import java.util.function.BiConsumer;
+
+/**
+ * Handle for async computations that supports multiple listeners and 
registering
+ * listeners after the computation has started
+ */
+public interface AsyncResult<V>
+{
+    void addCallback(BiConsumer<? super V, Throwable> callback);
+
+    default void addCallback(Runnable runnable)
+    {
+        addCallback((unused, failure) -> {
+            if (failure == null) runnable.run();
+            else throw new RuntimeException(failure);
+        });
+    }
+
+    default void addCallback(Runnable runnable, Executor executor)
+    {
+        addCallback(AsyncCallbacks.inExecutor(runnable, executor));
+    }
+
+    default AsyncChain<V> toChain()
+    {
+        return new AsyncChains.Head<V>()
+        {
+            @Override
+            public void begin(BiConsumer<? super V, Throwable> callback)
+            {
+                AsyncResult.this.addCallback(callback);
+            }
+        };
+    }
+
+    boolean isDone();
+    boolean isSuccess();
+
+    default void addCallback(BiConsumer<? super V, Throwable> callback, 
Executor executor)
+    {
+        addCallback(AsyncCallbacks.inExecutor(callback, executor));
+    }
+
+    default void addListener(Runnable runnable)
+    {
+        addCallback(runnable);
+    }
+
+    default void addListener(Runnable runnable, Executor executor)
+    {
+        addCallback(runnable, executor);
+    }

Review Comment:
   can we remove?  these are duplicates of `addCallback`



##########
accord-core/src/main/java/accord/utils/async/AsyncCallbacks.java:
##########
@@ -0,0 +1,45 @@
+package accord.utils.async;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executor;
+import java.util.function.BiConsumer;
+
+public class AsyncCallbacks
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(AsyncCallbacks.class);
+
+    private static final BiConsumer<Object, Throwable> NOOP = (unused, 
failure) -> {
+        if (failure != null)
+            logger.error("Exception received by noop callback", failure);
+    };
+
+    public static <T> BiConsumer<? super T, Throwable> noop()
+    {
+        return NOOP;
+    }
+
+    public static <T> BiConsumer<? super T, Throwable> inExecutor(BiConsumer<? 
super T, Throwable> callback, Executor executor)

Review Comment:
   nit: should move `executor` first as it cleans up the usage when lambdas are 
used:
   
   current:
   
   ```
   inExecutor((result, throwable) -> {
               if (throwable != null)
                   throw new RuntimeException(throwable);
               runnable.run();
           }, executor);
   ```
   
   desired:
   
   ```
   inExecutor(executor, (result, throwable) -> {
               if (throwable != null)
                   throw new RuntimeException(throwable);
               runnable.run();
           });
   ```



##########
accord-core/src/main/java/accord/utils/async/AsyncChainCombiner.java:
##########
@@ -0,0 +1,150 @@
+package accord.utils.async;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+
+abstract class AsyncChainCombiner<I, O> extends AsyncChains.Head<O>
+{
+    private static final AtomicIntegerFieldUpdater<AsyncChainCombiner> 
REMAINING = AtomicIntegerFieldUpdater.newUpdater(AsyncChainCombiner.class, 
"remaining");
+    private volatile Object state;
+    private volatile BiConsumer<? super O, Throwable> callback;
+    private volatile int remaining;
+
+    protected AsyncChainCombiner(List<AsyncChain<I>> inputs)
+    {
+        Preconditions.checkArgument(!inputs.isEmpty());
+        this.state = inputs;
+    }
+
+    private List<AsyncChain<? extends I>> inputs()
+    {
+        Object current = state;
+        Preconditions.checkState(current instanceof List);
+        return (List<AsyncChain<? extends I>>) current;
+    }
+
+    private I[] results()
+    {
+        Object current = state;
+        Preconditions.checkState(current instanceof Object[]);

Review Comment:
   nit: when this isn't true its great to know what the class is, so its easier 
to debug what happens



##########
accord-core/src/main/java/accord/utils/async/AsyncChains.java:
##########
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.utils.async;
+
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import static accord.utils.async.AsyncChainCombiner.Reduce;
+
+public abstract class AsyncChains<V> implements AsyncChain<V>
+{
+    static class Immediate<V> implements AsyncChain<V>
+    {
+        static class FailureHolder
+        {
+            final Throwable cause;
+            FailureHolder(Throwable cause)
+            {
+                this.cause = cause;
+            }
+        }
+
+        final private Object value;
+        private Immediate(V success) { this.value = success; }
+        private Immediate(Throwable failure) { this.value = new 
FailureHolder(failure); }
+
+        @Override
+        public <T> AsyncChain<T> map(Function<? super V, ? extends T> mapper)
+        {
+            if (value != null && value.getClass() == FailureHolder.class)
+                return (AsyncChain<T>) this;
+            return new Immediate<>(mapper.apply((V) value));
+        }
+
+        @Override
+        public <T> AsyncChain<T> flatMap(Function<? super V, ? extends 
AsyncChain<T>> mapper)
+        {
+            if (value != null && value.getClass() == FailureHolder.class)
+                return (AsyncChain<T>) this;
+            return mapper.apply((V) value);
+        }
+
+        @Override
+        public AsyncChain<V> addCallback(BiConsumer<? super V, Throwable> 
callback)
+        {
+            if (value == null || value.getClass() != FailureHolder.class)
+                callback.accept((V) value, null);
+            else
+                callback.accept(null, ((FailureHolder)value).cause);
+            return this;
+        }
+
+        @Override
+        public void begin(BiConsumer<? super V, Throwable> callback)
+        {
+            addCallback(callback);
+        }
+    }
+
+    public abstract static class Head<V> extends AsyncChains<V> implements 
BiConsumer<V, Throwable>
+    {
+        protected Head()
+        {
+            super(null);
+            next = this;
+        }
+
+        void begin()
+        {
+            begin(next);
+        }

Review Comment:
   believe Benedict fixes in his branch, but this impl allows calling 
`begin(BiConsumer)` multiple times, which could have bad side effects, we 
should guard against this



##########
accord-core/src/main/java/accord/utils/async/AsyncChainCombiner.java:
##########
@@ -0,0 +1,150 @@
+package accord.utils.async;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+
+abstract class AsyncChainCombiner<I, O> extends AsyncChains.Head<O>
+{
+    private static final AtomicIntegerFieldUpdater<AsyncChainCombiner> 
REMAINING = AtomicIntegerFieldUpdater.newUpdater(AsyncChainCombiner.class, 
"remaining");
+    private volatile Object state;
+    private volatile BiConsumer<? super O, Throwable> callback;
+    private volatile int remaining;
+
+    protected AsyncChainCombiner(List<AsyncChain<I>> inputs)
+    {
+        Preconditions.checkArgument(!inputs.isEmpty());
+        this.state = inputs;
+    }
+
+    private List<AsyncChain<? extends I>> inputs()
+    {
+        Object current = state;
+        Preconditions.checkState(current instanceof List);
+        return (List<AsyncChain<? extends I>>) current;
+    }
+
+    private I[] results()
+    {
+        Object current = state;
+        Preconditions.checkState(current instanceof Object[]);
+        return (I[]) current;
+    }
+
+    void add(AsyncChain<I> chain)
+    {
+        inputs().add(chain);
+    }
+
+    void addAll(List<AsyncChain<I>> chains)
+    {
+        inputs().addAll(chains);
+    }
+
+    int size()
+    {
+        Object current = state;
+        if (current instanceof List)
+            return ((List) current).size();
+        if (current instanceof Object[])
+            return ((Object[]) current).length;
+        throw new IllegalStateException();

Review Comment:
   nit: when this isn't true its great to know what the class is, so its easier 
to debug what happens



##########
accord-core/src/main/java/accord/utils/async/AsyncChain.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.utils.async;
+
+import java.util.concurrent.Executor;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public interface AsyncChain<V>
+{
+    /**
+     * Support {@link 
com.google.common.util.concurrent.Futures#transform(ListenableFuture, 
com.google.common.base.Function, Executor)} natively
+     */
+    <T> AsyncChain<T> map(Function<? super V, ? extends T> mapper);
+
+    /**
+     * Support {@link 
com.google.common.util.concurrent.Futures#transform(ListenableFuture, 
com.google.common.base.Function, Executor)} natively
+     */
+    <T> AsyncChain<T> flatMap(Function<? super V, ? extends AsyncChain<T>> 
mapper);
+
+    /**
+     * Support {@link com.google.common.util.concurrent.Futures#addCallback} 
natively
+     */
+    AsyncChain<V> addCallback(BiConsumer<? super V, Throwable> callback);
+
+    void begin(BiConsumer<? super V, Throwable> callback);
+
+    default void begin(Runnable runnable)
+    {
+        begin((unused, failure) -> {
+            if (failure == null) runnable.run();
+            else throw new RuntimeException(failure);
+        });

Review Comment:
   made a comment in 
`accord.utils.async.AsyncCallbacks#inExecutor(java.lang.Runnable, 
java.util.concurrent.Executor)` that we should refactor to use 
`accord.utils.async.AsyncCallbacks#inExecutor(java.util.function.BiConsumer<? 
super T,java.lang.Throwable>, java.util.concurrent.Executor)`... If we do this, 
then we can have the following
   
   ```
   // in AsyncCallacks
   public static <T> BiConsumer<? super T, Throwable> inExecutor(Runnable 
runnable, Executor executor)
       {
           return inExecutor(wrap(runnable), executor);
       }
   
       public static <T> BiConsumer<T, Throwable> wrap(Runnable runnable)
       {
           return (result, throwable) -> {
               if (throwable != null)
                   throw new RuntimeException(throwable);
               runnable.run();
           };
       }
   
   // here
   begin(AsyncCallbacks.wrap(runnable));
   ```



##########
accord-core/src/main/java/accord/utils/async/AsyncCallbacks.java:
##########
@@ -0,0 +1,45 @@
+package accord.utils.async;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executor;
+import java.util.function.BiConsumer;
+
+public class AsyncCallbacks
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(AsyncCallbacks.class);
+
+    private static final BiConsumer<Object, Throwable> NOOP = (unused, 
failure) -> {
+        if (failure != null)
+            logger.error("Exception received by noop callback", failure);
+    };
+
+    public static <T> BiConsumer<? super T, Throwable> noop()
+    {
+        return NOOP;
+    }
+
+    public static <T> BiConsumer<? super T, Throwable> inExecutor(BiConsumer<? 
super T, Throwable> callback, Executor executor)
+    {
+        return (result, throwable) -> {
+            try
+            {
+                executor.execute(() -> callback.accept(result, throwable));
+            }
+            catch (Throwable t)
+            {
+                callback.accept(null, t);
+            }
+        };
+    }
+
+
+    public static <T> BiConsumer<? super T, Throwable> inExecutor(Runnable 
runnable, Executor executor)
+    {
+        return (result, throwable) -> {
+            if (throwable == null) executor.execute(runnable);
+            else throw new RuntimeException(throwable);
+        };
+    }
+}

Review Comment:
   should change to be
   
   ```
   public static <T> BiConsumer<? super T, Throwable> inExecutor(Runnable 
runnable, Executor executor)
       {
           return inExecutor((result, throwable) -> {
               if (throwable != null) 
                   throw new RuntimeException(throwable);
               runnable.run();
           }, executor);
       }
   ```
   
   The other method handles the case that `executor.execute` throws (which it 
can), so simplifies. 



##########
accord-core/src/main/java/accord/utils/async/AsyncChains.java:
##########
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.utils.async;
+
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import static accord.utils.async.AsyncChainCombiner.Reduce;
+
+public abstract class AsyncChains<V> implements AsyncChain<V>
+{
+    static class Immediate<V> implements AsyncChain<V>
+    {
+        static class FailureHolder
+        {
+            final Throwable cause;
+            FailureHolder(Throwable cause)
+            {
+                this.cause = cause;
+            }
+        }
+
+        final private Object value;
+        private Immediate(V success) { this.value = success; }
+        private Immediate(Throwable failure) { this.value = new 
FailureHolder(failure); }
+
+        @Override
+        public <T> AsyncChain<T> map(Function<? super V, ? extends T> mapper)
+        {
+            if (value != null && value.getClass() == FailureHolder.class)
+                return (AsyncChain<T>) this;
+            return new Immediate<>(mapper.apply((V) value));
+        }
+
+        @Override
+        public <T> AsyncChain<T> flatMap(Function<? super V, ? extends 
AsyncChain<T>> mapper)
+        {
+            if (value != null && value.getClass() == FailureHolder.class)
+                return (AsyncChain<T>) this;
+            return mapper.apply((V) value);
+        }
+
+        @Override
+        public AsyncChain<V> addCallback(BiConsumer<? super V, Throwable> 
callback)
+        {
+            if (value == null || value.getClass() != FailureHolder.class)
+                callback.accept((V) value, null);
+            else
+                callback.accept(null, ((FailureHolder)value).cause);
+            return this;
+        }
+
+        @Override
+        public void begin(BiConsumer<? super V, Throwable> callback)
+        {
+            addCallback(callback);
+        }
+    }
+
+    public abstract static class Head<V> extends AsyncChains<V> implements 
BiConsumer<V, Throwable>
+    {
+        protected Head()
+        {
+            super(null);
+            next = this;
+        }
+
+        void begin()
+        {
+            begin(next);
+        }
+
+        @Override
+        public void accept(V v, Throwable throwable)
+        {
+            // we implement here just to simplify logic a little
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    static abstract class Link<I, O> extends AsyncChains<O> implements 
BiConsumer<I, Throwable>
+    {
+        protected Link(Head<?> head)
+        {
+            super(head);
+        }
+
+        @Override
+        public void begin(BiConsumer<? super O, Throwable> callback)
+        {
+            Preconditions.checkArgument(!(callback instanceof 
AsyncChains.Head));
+            Preconditions.checkState(next instanceof AsyncChains.Head);
+            Head<?> head = (Head<?>) next;
+            next = callback;
+            head.begin();
+        }
+    }
+
+    public static abstract class Map<I, O> extends Link<I, O> implements 
Function<I, O>
+    {
+        Map(Head<?> head)
+        {
+            super(head);
+        }
+
+        @Override
+        public void accept(I i, Throwable throwable)
+        {
+            if (throwable != null) next.accept(null, throwable);
+            else next.accept(apply(i), null);
+        }
+    }
+
+    static class EncapsulatedMap<I, O> extends Map<I, O>
+    {
+        final Function<? super I, ? extends O> map;
+
+        EncapsulatedMap(Head<?> head, Function<? super I, ? extends O> map)
+        {
+            super(head);
+            this.map = map;
+        }
+
+        @Override
+        public O apply(I i)
+        {
+            return map.apply(i);
+        }
+    }
+
+    public static abstract class FlatMap<I, O> extends Link<I, O> implements 
Function<I, AsyncChain<O>>
+    {
+        FlatMap(Head<?> head)
+        {
+            super(head);
+        }
+
+        @Override
+        public void accept(I i, Throwable throwable)
+        {
+            if (throwable != null) next.accept(null, throwable);
+            else apply(i).begin(next);
+        }
+    }
+
+    static class EncapsulatedFlatMap<I, O> extends FlatMap<I, O>
+    {
+        final Function<? super I, ? extends AsyncChain<O>> map;
+
+        EncapsulatedFlatMap(Head<?> head, Function<? super I, ? extends 
AsyncChain<O>> map)
+        {
+            super(head);
+            this.map = map;
+        }
+
+        @Override
+        public AsyncChain<O> apply(I i)
+        {
+            return map.apply(i);
+        }
+    }
+
+    // if extending Callback, be sure to invoke super.accept()
+    static class Callback<I> extends Link<I, I>
+    {
+        Callback(Head<?> head)
+        {
+            super(head);
+        }
+
+        @Override
+        public void accept(I i, Throwable throwable)
+        {
+            next.accept(i, throwable);
+        }
+    }
+
+    static class EncapsulatedCallback<I> extends Callback<I>
+    {
+        final BiConsumer<? super I, Throwable> callback;
+
+        EncapsulatedCallback(Head<?> head, BiConsumer<? super I, Throwable> 
callback)
+        {
+            super(head);
+            this.callback = callback;
+        }
+
+        @Override
+        public void accept(I i, Throwable throwable)
+        {
+            super.accept(i, throwable);
+            callback.accept(i, throwable);
+        }
+    }
+
+    // either the thing we start, or the thing we do in follow-up
+    BiConsumer<? super V, Throwable> next;
+    AsyncChains(Head<?> head)
+    {
+        this.next = (BiConsumer) head;
+    }
+
+    @Override
+    public <T> AsyncChain<T> map(Function<? super V, ? extends T> mapper)
+    {
+        return add(EncapsulatedMap::new, mapper);
+    }
+
+    @Override
+    public <T> AsyncChain<T> flatMap(Function<? super V, ? extends 
AsyncChain<T>> mapper)
+    {
+        return add(EncapsulatedFlatMap::new, mapper);
+    }
+
+    @Override
+    public AsyncChain<V> addCallback(BiConsumer<? super V, Throwable> callback)
+    {
+        return add(EncapsulatedCallback::new, callback);
+    }
+
+    // can be used by transformations that want efficiency, and can directly 
extend Link, FlatMap or Callback
+    // (or perhaps some additional helper implementations that permit us to 
simply implement apply for Map and FlatMap)
+    <O, T extends AsyncChain<O> & BiConsumer<? super V, Throwable>> 
AsyncChain<O> add(Function<Head<?>, T> factory)
+    {
+        Preconditions.checkState(next instanceof Head<?>);
+        Head<?> head = (Head<?>) next;
+        T result = factory.apply(head);
+        next = result;
+        return result;
+    }
+
+    <P, O, T extends AsyncChain<O> & BiConsumer<? super V, Throwable>> 
AsyncChain<O> add(BiFunction<Head<?>, P, T> factory, P param)
+    {
+        Preconditions.checkState(next instanceof Head<?>);

Review Comment:
   nit: would be great to have useful error messages



##########
accord-core/src/main/java/accord/utils/async/AsyncChains.java:
##########
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.utils.async;
+
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import static accord.utils.async.AsyncChainCombiner.Reduce;
+
+public abstract class AsyncChains<V> implements AsyncChain<V>
+{
+    static class Immediate<V> implements AsyncChain<V>
+    {
+        static class FailureHolder
+        {
+            final Throwable cause;
+            FailureHolder(Throwable cause)
+            {
+                this.cause = cause;
+            }
+        }
+
+        final private Object value;
+        private Immediate(V success) { this.value = success; }
+        private Immediate(Throwable failure) { this.value = new 
FailureHolder(failure); }
+
+        @Override
+        public <T> AsyncChain<T> map(Function<? super V, ? extends T> mapper)
+        {
+            if (value != null && value.getClass() == FailureHolder.class)
+                return (AsyncChain<T>) this;
+            return new Immediate<>(mapper.apply((V) value));
+        }
+
+        @Override
+        public <T> AsyncChain<T> flatMap(Function<? super V, ? extends 
AsyncChain<T>> mapper)
+        {
+            if (value != null && value.getClass() == FailureHolder.class)
+                return (AsyncChain<T>) this;
+            return mapper.apply((V) value);
+        }
+
+        @Override
+        public AsyncChain<V> addCallback(BiConsumer<? super V, Throwable> 
callback)
+        {
+            if (value == null || value.getClass() != FailureHolder.class)
+                callback.accept((V) value, null);
+            else
+                callback.accept(null, ((FailureHolder)value).cause);
+            return this;
+        }
+
+        @Override
+        public void begin(BiConsumer<? super V, Throwable> callback)
+        {
+            addCallback(callback);
+        }
+    }
+
+    public abstract static class Head<V> extends AsyncChains<V> implements 
BiConsumer<V, Throwable>
+    {
+        protected Head()
+        {
+            super(null);
+            next = this;
+        }
+
+        void begin()
+        {
+            begin(next);
+        }
+
+        @Override
+        public void accept(V v, Throwable throwable)
+        {
+            // we implement here just to simplify logic a little
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    static abstract class Link<I, O> extends AsyncChains<O> implements 
BiConsumer<I, Throwable>
+    {
+        protected Link(Head<?> head)
+        {
+            super(head);
+        }
+
+        @Override
+        public void begin(BiConsumer<? super O, Throwable> callback)
+        {
+            Preconditions.checkArgument(!(callback instanceof 
AsyncChains.Head));
+            Preconditions.checkState(next instanceof AsyncChains.Head);
+            Head<?> head = (Head<?>) next;
+            next = callback;
+            head.begin();
+        }
+    }
+
+    public static abstract class Map<I, O> extends Link<I, O> implements 
Function<I, O>
+    {
+        Map(Head<?> head)
+        {
+            super(head);
+        }
+
+        @Override
+        public void accept(I i, Throwable throwable)
+        {
+            if (throwable != null) next.accept(null, throwable);
+            else next.accept(apply(i), null);
+        }
+    }
+
+    static class EncapsulatedMap<I, O> extends Map<I, O>
+    {
+        final Function<? super I, ? extends O> map;
+
+        EncapsulatedMap(Head<?> head, Function<? super I, ? extends O> map)
+        {
+            super(head);
+            this.map = map;
+        }
+
+        @Override
+        public O apply(I i)
+        {
+            return map.apply(i);
+        }
+    }
+
+    public static abstract class FlatMap<I, O> extends Link<I, O> implements 
Function<I, AsyncChain<O>>
+    {
+        FlatMap(Head<?> head)
+        {
+            super(head);
+        }
+
+        @Override
+        public void accept(I i, Throwable throwable)
+        {
+            if (throwable != null) next.accept(null, throwable);
+            else apply(i).begin(next);
+        }
+    }
+
+    static class EncapsulatedFlatMap<I, O> extends FlatMap<I, O>
+    {
+        final Function<? super I, ? extends AsyncChain<O>> map;
+
+        EncapsulatedFlatMap(Head<?> head, Function<? super I, ? extends 
AsyncChain<O>> map)
+        {
+            super(head);
+            this.map = map;
+        }
+
+        @Override
+        public AsyncChain<O> apply(I i)
+        {
+            return map.apply(i);
+        }
+    }
+
+    // if extending Callback, be sure to invoke super.accept()
+    static class Callback<I> extends Link<I, I>
+    {
+        Callback(Head<?> head)
+        {
+            super(head);
+        }
+
+        @Override
+        public void accept(I i, Throwable throwable)
+        {
+            next.accept(i, throwable);
+        }
+    }
+
+    static class EncapsulatedCallback<I> extends Callback<I>
+    {
+        final BiConsumer<? super I, Throwable> callback;
+
+        EncapsulatedCallback(Head<?> head, BiConsumer<? super I, Throwable> 
callback)
+        {
+            super(head);
+            this.callback = callback;
+        }
+
+        @Override
+        public void accept(I i, Throwable throwable)
+        {
+            super.accept(i, throwable);
+            callback.accept(i, throwable);
+        }
+    }
+
+    // either the thing we start, or the thing we do in follow-up
+    BiConsumer<? super V, Throwable> next;
+    AsyncChains(Head<?> head)
+    {
+        this.next = (BiConsumer) head;
+    }
+
+    @Override
+    public <T> AsyncChain<T> map(Function<? super V, ? extends T> mapper)
+    {
+        return add(EncapsulatedMap::new, mapper);
+    }
+
+    @Override
+    public <T> AsyncChain<T> flatMap(Function<? super V, ? extends 
AsyncChain<T>> mapper)
+    {
+        return add(EncapsulatedFlatMap::new, mapper);
+    }
+
+    @Override
+    public AsyncChain<V> addCallback(BiConsumer<? super V, Throwable> callback)
+    {
+        return add(EncapsulatedCallback::new, callback);
+    }
+
+    // can be used by transformations that want efficiency, and can directly 
extend Link, FlatMap or Callback
+    // (or perhaps some additional helper implementations that permit us to 
simply implement apply for Map and FlatMap)
+    <O, T extends AsyncChain<O> & BiConsumer<? super V, Throwable>> 
AsyncChain<O> add(Function<Head<?>, T> factory)
+    {
+        Preconditions.checkState(next instanceof Head<?>);

Review Comment:
   nit: would be great to have useful error messages



##########
accord-core/src/main/java/accord/utils/async/AsyncChain.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.utils.async;
+
+import java.util.concurrent.Executor;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public interface AsyncChain<V>
+{
+    /**
+     * Support {@link 
com.google.common.util.concurrent.Futures#transform(ListenableFuture, 
com.google.common.base.Function, Executor)} natively
+     */
+    <T> AsyncChain<T> map(Function<? super V, ? extends T> mapper);
+
+    /**
+     * Support {@link 
com.google.common.util.concurrent.Futures#transform(ListenableFuture, 
com.google.common.base.Function, Executor)} natively
+     */
+    <T> AsyncChain<T> flatMap(Function<? super V, ? extends AsyncChain<T>> 
mapper);
+
+    /**
+     * Support {@link com.google.common.util.concurrent.Futures#addCallback} 
natively
+     */
+    AsyncChain<V> addCallback(BiConsumer<? super V, Throwable> callback);
+
+    void begin(BiConsumer<? super V, Throwable> callback);

Review Comment:
   IMO we should define and document expected semantics as this may be brittle 
to people familiar with `Future` (left this comment about `Head` as well)...
   
   *should* `begin` allow multiple calls, or should this be rejected?  The 
current implementations (other than `Head`) reject multiple calls, which I feel 
is the desired behavior in all cases



##########
accord-core/src/main/java/accord/utils/async/AsyncChains.java:
##########
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.utils.async;
+
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import static accord.utils.async.AsyncChainCombiner.Reduce;
+
+public abstract class AsyncChains<V> implements AsyncChain<V>
+{
+    static class Immediate<V> implements AsyncChain<V>
+    {
+        static class FailureHolder
+        {
+            final Throwable cause;
+            FailureHolder(Throwable cause)
+            {
+                this.cause = cause;
+            }
+        }
+
+        final private Object value;
+        private Immediate(V success) { this.value = success; }
+        private Immediate(Throwable failure) { this.value = new 
FailureHolder(failure); }
+
+        @Override
+        public <T> AsyncChain<T> map(Function<? super V, ? extends T> mapper)
+        {
+            if (value != null && value.getClass() == FailureHolder.class)
+                return (AsyncChain<T>) this;
+            return new Immediate<>(mapper.apply((V) value));
+        }
+
+        @Override
+        public <T> AsyncChain<T> flatMap(Function<? super V, ? extends 
AsyncChain<T>> mapper)
+        {
+            if (value != null && value.getClass() == FailureHolder.class)
+                return (AsyncChain<T>) this;
+            return mapper.apply((V) value);
+        }
+
+        @Override
+        public AsyncChain<V> addCallback(BiConsumer<? super V, Throwable> 
callback)
+        {
+            if (value == null || value.getClass() != FailureHolder.class)
+                callback.accept((V) value, null);
+            else
+                callback.accept(null, ((FailureHolder)value).cause);
+            return this;
+        }
+
+        @Override
+        public void begin(BiConsumer<? super V, Throwable> callback)
+        {
+            addCallback(callback);
+        }
+    }
+
+    public abstract static class Head<V> extends AsyncChains<V> implements 
BiConsumer<V, Throwable>
+    {
+        protected Head()
+        {
+            super(null);
+            next = this;
+        }
+
+        void begin()
+        {
+            begin(next);
+        }
+
+        @Override
+        public void accept(V v, Throwable throwable)
+        {
+            // we implement here just to simplify logic a little
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    static abstract class Link<I, O> extends AsyncChains<O> implements 
BiConsumer<I, Throwable>
+    {
+        protected Link(Head<?> head)
+        {
+            super(head);
+        }
+
+        @Override
+        public void begin(BiConsumer<? super O, Throwable> callback)
+        {
+            Preconditions.checkArgument(!(callback instanceof 
AsyncChains.Head));
+            Preconditions.checkState(next instanceof AsyncChains.Head);
+            Head<?> head = (Head<?>) next;
+            next = callback;
+            head.begin();
+        }
+    }
+
+    public static abstract class Map<I, O> extends Link<I, O> implements 
Function<I, O>
+    {
+        Map(Head<?> head)
+        {
+            super(head);
+        }
+
+        @Override
+        public void accept(I i, Throwable throwable)
+        {
+            if (throwable != null) next.accept(null, throwable);
+            else next.accept(apply(i), null);
+        }
+    }
+
+    static class EncapsulatedMap<I, O> extends Map<I, O>
+    {
+        final Function<? super I, ? extends O> map;
+
+        EncapsulatedMap(Head<?> head, Function<? super I, ? extends O> map)
+        {
+            super(head);
+            this.map = map;
+        }
+
+        @Override
+        public O apply(I i)
+        {
+            return map.apply(i);
+        }
+    }
+
+    public static abstract class FlatMap<I, O> extends Link<I, O> implements 
Function<I, AsyncChain<O>>
+    {
+        FlatMap(Head<?> head)
+        {
+            super(head);
+        }
+
+        @Override
+        public void accept(I i, Throwable throwable)
+        {
+            if (throwable != null) next.accept(null, throwable);
+            else apply(i).begin(next);
+        }
+    }
+
+    static class EncapsulatedFlatMap<I, O> extends FlatMap<I, O>
+    {
+        final Function<? super I, ? extends AsyncChain<O>> map;
+
+        EncapsulatedFlatMap(Head<?> head, Function<? super I, ? extends 
AsyncChain<O>> map)
+        {
+            super(head);
+            this.map = map;
+        }
+
+        @Override
+        public AsyncChain<O> apply(I i)
+        {
+            return map.apply(i);
+        }
+    }
+
+    // if extending Callback, be sure to invoke super.accept()
+    static class Callback<I> extends Link<I, I>
+    {
+        Callback(Head<?> head)
+        {
+            super(head);
+        }
+
+        @Override
+        public void accept(I i, Throwable throwable)
+        {
+            next.accept(i, throwable);
+        }
+    }
+
+    static class EncapsulatedCallback<I> extends Callback<I>
+    {
+        final BiConsumer<? super I, Throwable> callback;
+
+        EncapsulatedCallback(Head<?> head, BiConsumer<? super I, Throwable> 
callback)
+        {
+            super(head);
+            this.callback = callback;
+        }
+
+        @Override
+        public void accept(I i, Throwable throwable)
+        {
+            super.accept(i, throwable);
+            callback.accept(i, throwable);
+        }
+    }
+
+    // either the thing we start, or the thing we do in follow-up
+    BiConsumer<? super V, Throwable> next;
+    AsyncChains(Head<?> head)
+    {
+        this.next = (BiConsumer) head;
+    }
+
+    @Override
+    public <T> AsyncChain<T> map(Function<? super V, ? extends T> mapper)
+    {
+        return add(EncapsulatedMap::new, mapper);
+    }
+
+    @Override
+    public <T> AsyncChain<T> flatMap(Function<? super V, ? extends 
AsyncChain<T>> mapper)
+    {
+        return add(EncapsulatedFlatMap::new, mapper);
+    }
+
+    @Override
+    public AsyncChain<V> addCallback(BiConsumer<? super V, Throwable> callback)
+    {
+        return add(EncapsulatedCallback::new, callback);
+    }
+
+    // can be used by transformations that want efficiency, and can directly 
extend Link, FlatMap or Callback
+    // (or perhaps some additional helper implementations that permit us to 
simply implement apply for Map and FlatMap)
+    <O, T extends AsyncChain<O> & BiConsumer<? super V, Throwable>> 
AsyncChain<O> add(Function<Head<?>, T> factory)
+    {
+        Preconditions.checkState(next instanceof Head<?>);
+        Head<?> head = (Head<?>) next;
+        T result = factory.apply(head);
+        next = result;
+        return result;
+    }
+
+    <P, O, T extends AsyncChain<O> & BiConsumer<? super V, Throwable>> 
AsyncChain<O> add(BiFunction<Head<?>, P, T> factory, P param)
+    {
+        Preconditions.checkState(next instanceof Head<?>);
+        Head<?> head = (Head<?>) next;
+        T result = factory.apply(head, param);
+        next = result;
+        return result;
+    }
+
+    private static <V> Runnable encapsulate(Callable<V> callable, BiConsumer<? 
super V, Throwable> receiver)
+    {
+        return () -> {
+            try
+            {
+                V result = callable.call();
+                receiver.accept(result, null);
+            }
+            catch (Throwable t)
+            {
+                receiver.accept(null, t);
+            }
+        };
+    }
+
+    private static Runnable encapsulate(Runnable runnable, BiConsumer<? super 
Void, Throwable> receiver)
+    {
+        return () -> {
+            try
+            {
+                runnable.run();
+                receiver.accept(null, null);
+            }
+            catch (Throwable t)
+            {
+                receiver.accept(null, t);
+            }
+        };
+    }
+
+    public static <V> AsyncChain<V> success(V success)
+    {
+        return new Immediate<>(success);
+    }
+
+    public static <V> AsyncChain<V> failure(Throwable failure)
+    {
+        return new Immediate<>(failure);
+    }
+
+    public static <V> AsyncChain<V> ofCallable(Executor executor, Callable<V> 
callable)
+    {
+        return new Head<V>()
+        {
+            @Override
+            public void begin(BiConsumer<? super V, Throwable> next)
+            {
+                executor.execute(encapsulate(callable, next));
+            }
+        };
+    }
+
+    public static AsyncChain<Void> ofRunnable(Executor executor, Runnable 
runnable)
+    {
+        return new Head<Void>()
+        {
+            @Override
+            public void begin(BiConsumer<? super Void, Throwable> callback)
+            {
+                executor.execute(AsyncChains.encapsulate(runnable, callback));
+            }
+        };
+    }
+
+    public static <V> AsyncChain<List<V>> all(List<AsyncChain<V>> chains)
+    {
+        Preconditions.checkArgument(!chains.isEmpty());
+        return new AsyncChainCombiner.All<>(chains);
+    }
+
+    public static <V> AsyncChain<V> reduce(List<AsyncChain<V>> chains, 
BiFunction<V, V, V> reducer)
+    {
+        Preconditions.checkArgument(!chains.isEmpty());
+        if (chains.size() == 1)
+            return chains.get(0);
+        if (Reduce.canAppendTo(chains.get(0), reducer))
+        {
+            AsyncChainCombiner.Reduce<V> appendTo = 
(AsyncChainCombiner.Reduce<V>) chains.get(0);
+            appendTo.addAll(chains.subList(1, chains.size()));
+            return appendTo;
+        }
+        return new Reduce<>(chains, reducer);
+    }
+
+    public static <V> AsyncChain<V> reduce(AsyncChain<V> a, AsyncChain<V> b, 
BiFunction<V, V, V> reducer)
+    {
+        if (Reduce.canAppendTo(a, reducer))
+        {
+            AsyncChainCombiner.Reduce<V> appendTo = 
(AsyncChainCombiner.Reduce<V>) a;
+            appendTo.add(b);
+            return a;
+        }
+        return new Reduce<>(Lists.newArrayList(a, b), reducer);
+    }
+
+    public static <V> V getBlocking(AsyncChain<V> chain) throws 
InterruptedException, ExecutionException
+    {
+        class Result
+        {
+            final V result;
+            final Throwable failure;
+
+            public Result(V result, Throwable failure)
+            {
+                this.result = result;
+                this.failure = failure;
+            }
+        }
+
+        AtomicReference<Result> callbackResult = new AtomicReference<>();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        chain.begin((result, failure) -> {
+            callbackResult.set(new Result(result, failure));
+            latch.countDown();
+        });
+
+        latch.await();
+        Result result = callbackResult.get();
+        if (result.failure == null) return result.result;
+        else throw new ExecutionException(result.failure);
+    }
+
+    public static <V> V getBlocking(AsyncChain<V> chain, long timeout, 
TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException
+    {
+        class Result
+        {
+            final V result;
+            final Throwable failure;
+
+            public Result(V result, Throwable failure)
+            {
+                this.result = result;
+                this.failure = failure;
+            }
+        }
+
+        AtomicReference<Result> callbackResult = new AtomicReference<>();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        chain.begin((result, failure) -> {
+            callbackResult.set(new Result(result, failure));
+            latch.countDown();
+        });
+
+        if (!latch.await(timeout, unit))
+            throw new TimeoutException();
+        Result result = callbackResult.get();
+        if (result.failure == null) return result.result;
+        else throw new ExecutionException(result.failure);
+    }
+
+    public static <V> V getUninterruptibly(AsyncChain<V> chain)
+    {
+        try
+        {
+            return getBlocking(chain);
+        }
+        catch (ExecutionException | InterruptedException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static <V> void awaitUninterruptibly(AsyncChain<V> chain)
+    {
+        getUninterruptibly(chain);
+    }

Review Comment:
   Might not be what people expect, I think it should be
   
   ```
   public static <V> V getUninterruptibly(AsyncChain<V> chain) throws 
ExecutionException
       {
           boolean interrupted = false;
           try
           {
               while (true)
               {
                   try
                   {
                       return getBlocking(chain);
                   }
                   catch (InterruptedException e)
                   {
                       interrupted = true;
                   }
               }
           }
           finally
           {
               if (interrupted)
                   Thread.currentThread().interrupt();
           }
       }
   
       public static <V> void awaitUninterruptibly(AsyncChain<V> chain) throws 
ExecutionException
       {
           getUninterruptibly(chain);
       }
   ```



##########
accord-core/src/main/java/accord/utils/async/AsyncChains.java:
##########
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.utils.async;
+
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import static accord.utils.async.AsyncChainCombiner.Reduce;
+
+public abstract class AsyncChains<V> implements AsyncChain<V>
+{
+    static class Immediate<V> implements AsyncChain<V>
+    {
+        static class FailureHolder
+        {
+            final Throwable cause;
+            FailureHolder(Throwable cause)
+            {
+                this.cause = cause;
+            }
+        }
+
+        final private Object value;
+        private Immediate(V success) { this.value = success; }
+        private Immediate(Throwable failure) { this.value = new 
FailureHolder(failure); }
+
+        @Override
+        public <T> AsyncChain<T> map(Function<? super V, ? extends T> mapper)
+        {
+            if (value != null && value.getClass() == FailureHolder.class)
+                return (AsyncChain<T>) this;
+            return new Immediate<>(mapper.apply((V) value));
+        }
+
+        @Override
+        public <T> AsyncChain<T> flatMap(Function<? super V, ? extends 
AsyncChain<T>> mapper)
+        {
+            if (value != null && value.getClass() == FailureHolder.class)
+                return (AsyncChain<T>) this;
+            return mapper.apply((V) value);
+        }
+
+        @Override
+        public AsyncChain<V> addCallback(BiConsumer<? super V, Throwable> 
callback)
+        {
+            if (value == null || value.getClass() != FailureHolder.class)
+                callback.accept((V) value, null);
+            else
+                callback.accept(null, ((FailureHolder)value).cause);
+            return this;
+        }
+
+        @Override
+        public void begin(BiConsumer<? super V, Throwable> callback)
+        {
+            addCallback(callback);
+        }
+    }
+
+    public abstract static class Head<V> extends AsyncChains<V> implements 
BiConsumer<V, Throwable>
+    {
+        protected Head()
+        {
+            super(null);
+            next = this;
+        }
+
+        void begin()
+        {
+            begin(next);
+        }
+
+        @Override
+        public void accept(V v, Throwable throwable)
+        {
+            // we implement here just to simplify logic a little
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    static abstract class Link<I, O> extends AsyncChains<O> implements 
BiConsumer<I, Throwable>
+    {
+        protected Link(Head<?> head)
+        {
+            super(head);
+        }
+
+        @Override
+        public void begin(BiConsumer<? super O, Throwable> callback)
+        {
+            Preconditions.checkArgument(!(callback instanceof 
AsyncChains.Head));
+            Preconditions.checkState(next instanceof AsyncChains.Head);
+            Head<?> head = (Head<?>) next;
+            next = callback;
+            head.begin();
+        }
+    }
+
+    public static abstract class Map<I, O> extends Link<I, O> implements 
Function<I, O>
+    {
+        Map(Head<?> head)
+        {
+            super(head);
+        }
+
+        @Override
+        public void accept(I i, Throwable throwable)
+        {
+            if (throwable != null) next.accept(null, throwable);
+            else next.accept(apply(i), null);
+        }
+    }
+
+    static class EncapsulatedMap<I, O> extends Map<I, O>
+    {
+        final Function<? super I, ? extends O> map;
+
+        EncapsulatedMap(Head<?> head, Function<? super I, ? extends O> map)
+        {
+            super(head);
+            this.map = map;
+        }
+
+        @Override
+        public O apply(I i)
+        {
+            return map.apply(i);
+        }
+    }
+
+    public static abstract class FlatMap<I, O> extends Link<I, O> implements 
Function<I, AsyncChain<O>>
+    {
+        FlatMap(Head<?> head)
+        {
+            super(head);
+        }
+
+        @Override
+        public void accept(I i, Throwable throwable)
+        {
+            if (throwable != null) next.accept(null, throwable);
+            else apply(i).begin(next);
+        }
+    }
+
+    static class EncapsulatedFlatMap<I, O> extends FlatMap<I, O>
+    {
+        final Function<? super I, ? extends AsyncChain<O>> map;
+
+        EncapsulatedFlatMap(Head<?> head, Function<? super I, ? extends 
AsyncChain<O>> map)
+        {
+            super(head);
+            this.map = map;
+        }
+
+        @Override
+        public AsyncChain<O> apply(I i)
+        {
+            return map.apply(i);
+        }
+    }
+
+    // if extending Callback, be sure to invoke super.accept()
+    static class Callback<I> extends Link<I, I>
+    {
+        Callback(Head<?> head)
+        {
+            super(head);
+        }
+
+        @Override
+        public void accept(I i, Throwable throwable)
+        {
+            next.accept(i, throwable);
+        }
+    }
+
+    static class EncapsulatedCallback<I> extends Callback<I>
+    {
+        final BiConsumer<? super I, Throwable> callback;
+
+        EncapsulatedCallback(Head<?> head, BiConsumer<? super I, Throwable> 
callback)
+        {
+            super(head);
+            this.callback = callback;
+        }
+
+        @Override
+        public void accept(I i, Throwable throwable)
+        {
+            super.accept(i, throwable);
+            callback.accept(i, throwable);
+        }
+    }
+
+    // either the thing we start, or the thing we do in follow-up
+    BiConsumer<? super V, Throwable> next;
+    AsyncChains(Head<?> head)
+    {
+        this.next = (BiConsumer) head;
+    }
+
+    @Override
+    public <T> AsyncChain<T> map(Function<? super V, ? extends T> mapper)
+    {
+        return add(EncapsulatedMap::new, mapper);
+    }
+
+    @Override
+    public <T> AsyncChain<T> flatMap(Function<? super V, ? extends 
AsyncChain<T>> mapper)
+    {
+        return add(EncapsulatedFlatMap::new, mapper);
+    }
+
+    @Override
+    public AsyncChain<V> addCallback(BiConsumer<? super V, Throwable> callback)
+    {
+        return add(EncapsulatedCallback::new, callback);
+    }
+
+    // can be used by transformations that want efficiency, and can directly 
extend Link, FlatMap or Callback
+    // (or perhaps some additional helper implementations that permit us to 
simply implement apply for Map and FlatMap)
+    <O, T extends AsyncChain<O> & BiConsumer<? super V, Throwable>> 
AsyncChain<O> add(Function<Head<?>, T> factory)
+    {
+        Preconditions.checkState(next instanceof Head<?>);
+        Head<?> head = (Head<?>) next;
+        T result = factory.apply(head);
+        next = result;
+        return result;
+    }
+
+    <P, O, T extends AsyncChain<O> & BiConsumer<? super V, Throwable>> 
AsyncChain<O> add(BiFunction<Head<?>, P, T> factory, P param)
+    {
+        Preconditions.checkState(next instanceof Head<?>);
+        Head<?> head = (Head<?>) next;
+        T result = factory.apply(head, param);
+        next = result;
+        return result;
+    }
+
+    private static <V> Runnable encapsulate(Callable<V> callable, BiConsumer<? 
super V, Throwable> receiver)
+    {
+        return () -> {
+            try
+            {
+                V result = callable.call();
+                receiver.accept(result, null);
+            }
+            catch (Throwable t)
+            {
+                receiver.accept(null, t);
+            }
+        };
+    }
+
+    private static Runnable encapsulate(Runnable runnable, BiConsumer<? super 
Void, Throwable> receiver)
+    {
+        return () -> {
+            try
+            {
+                runnable.run();
+                receiver.accept(null, null);
+            }
+            catch (Throwable t)
+            {
+                receiver.accept(null, t);
+            }
+        };
+    }
+
+    public static <V> AsyncChain<V> success(V success)
+    {
+        return new Immediate<>(success);
+    }
+
+    public static <V> AsyncChain<V> failure(Throwable failure)
+    {
+        return new Immediate<>(failure);
+    }
+
+    public static <V> AsyncChain<V> ofCallable(Executor executor, Callable<V> 
callable)
+    {
+        return new Head<V>()
+        {
+            @Override
+            public void begin(BiConsumer<? super V, Throwable> next)
+            {
+                executor.execute(encapsulate(callable, next));
+            }
+        };
+    }
+
+    public static AsyncChain<Void> ofRunnable(Executor executor, Runnable 
runnable)
+    {
+        return new Head<Void>()
+        {
+            @Override
+            public void begin(BiConsumer<? super Void, Throwable> callback)
+            {
+                executor.execute(AsyncChains.encapsulate(runnable, callback));
+            }
+        };
+    }
+
+    public static <V> AsyncChain<List<V>> all(List<AsyncChain<V>> chains)
+    {
+        Preconditions.checkArgument(!chains.isEmpty());
+        return new AsyncChainCombiner.All<>(chains);
+    }
+
+    public static <V> AsyncChain<V> reduce(List<AsyncChain<V>> chains, 
BiFunction<V, V, V> reducer)
+    {
+        Preconditions.checkArgument(!chains.isEmpty());
+        if (chains.size() == 1)
+            return chains.get(0);
+        if (Reduce.canAppendTo(chains.get(0), reducer))
+        {
+            AsyncChainCombiner.Reduce<V> appendTo = 
(AsyncChainCombiner.Reduce<V>) chains.get(0);
+            appendTo.addAll(chains.subList(1, chains.size()));
+            return appendTo;
+        }
+        return new Reduce<>(chains, reducer);
+    }
+
+    public static <V> AsyncChain<V> reduce(AsyncChain<V> a, AsyncChain<V> b, 
BiFunction<V, V, V> reducer)
+    {
+        if (Reduce.canAppendTo(a, reducer))
+        {
+            AsyncChainCombiner.Reduce<V> appendTo = 
(AsyncChainCombiner.Reduce<V>) a;
+            appendTo.add(b);
+            return a;
+        }
+        return new Reduce<>(Lists.newArrayList(a, b), reducer);
+    }
+
+    public static <V> V getBlocking(AsyncChain<V> chain) throws 
InterruptedException, ExecutionException
+    {
+        class Result
+        {
+            final V result;
+            final Throwable failure;
+
+            public Result(V result, Throwable failure)
+            {
+                this.result = result;
+                this.failure = failure;
+            }
+        }
+
+        AtomicReference<Result> callbackResult = new AtomicReference<>();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        chain.begin((result, failure) -> {
+            callbackResult.set(new Result(result, failure));
+            latch.countDown();
+        });
+
+        latch.await();
+        Result result = callbackResult.get();
+        if (result.failure == null) return result.result;
+        else throw new ExecutionException(result.failure);
+    }
+
+    public static <V> V getBlocking(AsyncChain<V> chain, long timeout, 
TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException
+    {
+        class Result
+        {
+            final V result;
+            final Throwable failure;
+
+            public Result(V result, Throwable failure)
+            {
+                this.result = result;
+                this.failure = failure;
+            }
+        }
+
+        AtomicReference<Result> callbackResult = new AtomicReference<>();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        chain.begin((result, failure) -> {
+            callbackResult.set(new Result(result, failure));
+            latch.countDown();
+        });
+
+        if (!latch.await(timeout, unit))
+            throw new TimeoutException();
+        Result result = callbackResult.get();
+        if (result.failure == null) return result.result;
+        else throw new ExecutionException(result.failure);

Review Comment:
   can simplify with
   
   ```
   CompletableFuture<V> future = new CompletableFuture<>();
           chain.begin((result, failure) -> {
               if (failure != null) future.completeExceptionally(failure);
               else future.complete(result);
           });
           return future.get(timeout, unit);
   ```



##########
accord-core/src/main/java/accord/utils/async/AsyncChains.java:
##########
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.utils.async;
+
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import static accord.utils.async.AsyncChainCombiner.Reduce;
+
+public abstract class AsyncChains<V> implements AsyncChain<V>
+{
+    static class Immediate<V> implements AsyncChain<V>
+    {
+        static class FailureHolder
+        {
+            final Throwable cause;
+            FailureHolder(Throwable cause)
+            {
+                this.cause = cause;
+            }
+        }
+
+        final private Object value;
+        private Immediate(V success) { this.value = success; }
+        private Immediate(Throwable failure) { this.value = new 
FailureHolder(failure); }
+
+        @Override
+        public <T> AsyncChain<T> map(Function<? super V, ? extends T> mapper)
+        {
+            if (value != null && value.getClass() == FailureHolder.class)
+                return (AsyncChain<T>) this;
+            return new Immediate<>(mapper.apply((V) value));
+        }
+
+        @Override
+        public <T> AsyncChain<T> flatMap(Function<? super V, ? extends 
AsyncChain<T>> mapper)
+        {
+            if (value != null && value.getClass() == FailureHolder.class)
+                return (AsyncChain<T>) this;
+            return mapper.apply((V) value);
+        }
+
+        @Override
+        public AsyncChain<V> addCallback(BiConsumer<? super V, Throwable> 
callback)
+        {
+            if (value == null || value.getClass() != FailureHolder.class)
+                callback.accept((V) value, null);
+            else
+                callback.accept(null, ((FailureHolder)value).cause);
+            return this;
+        }
+
+        @Override
+        public void begin(BiConsumer<? super V, Throwable> callback)
+        {
+            addCallback(callback);
+        }
+    }
+
+    public abstract static class Head<V> extends AsyncChains<V> implements 
BiConsumer<V, Throwable>
+    {
+        protected Head()
+        {
+            super(null);
+            next = this;
+        }
+
+        void begin()
+        {
+            begin(next);
+        }
+
+        @Override
+        public void accept(V v, Throwable throwable)
+        {
+            // we implement here just to simplify logic a little
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    static abstract class Link<I, O> extends AsyncChains<O> implements 
BiConsumer<I, Throwable>
+    {
+        protected Link(Head<?> head)
+        {
+            super(head);
+        }
+
+        @Override
+        public void begin(BiConsumer<? super O, Throwable> callback)
+        {
+            Preconditions.checkArgument(!(callback instanceof 
AsyncChains.Head));
+            Preconditions.checkState(next instanceof AsyncChains.Head);
+            Head<?> head = (Head<?>) next;
+            next = callback;
+            head.begin();
+        }
+    }
+
+    public static abstract class Map<I, O> extends Link<I, O> implements 
Function<I, O>
+    {
+        Map(Head<?> head)
+        {
+            super(head);
+        }
+
+        @Override
+        public void accept(I i, Throwable throwable)
+        {
+            if (throwable != null) next.accept(null, throwable);
+            else next.accept(apply(i), null);
+        }
+    }
+
+    static class EncapsulatedMap<I, O> extends Map<I, O>
+    {
+        final Function<? super I, ? extends O> map;
+
+        EncapsulatedMap(Head<?> head, Function<? super I, ? extends O> map)
+        {
+            super(head);
+            this.map = map;
+        }
+
+        @Override
+        public O apply(I i)
+        {
+            return map.apply(i);
+        }
+    }
+
+    public static abstract class FlatMap<I, O> extends Link<I, O> implements 
Function<I, AsyncChain<O>>
+    {
+        FlatMap(Head<?> head)
+        {
+            super(head);
+        }
+
+        @Override
+        public void accept(I i, Throwable throwable)
+        {
+            if (throwable != null) next.accept(null, throwable);
+            else apply(i).begin(next);
+        }
+    }
+
+    static class EncapsulatedFlatMap<I, O> extends FlatMap<I, O>
+    {
+        final Function<? super I, ? extends AsyncChain<O>> map;
+
+        EncapsulatedFlatMap(Head<?> head, Function<? super I, ? extends 
AsyncChain<O>> map)
+        {
+            super(head);
+            this.map = map;
+        }
+
+        @Override
+        public AsyncChain<O> apply(I i)
+        {
+            return map.apply(i);
+        }
+    }
+
+    // if extending Callback, be sure to invoke super.accept()
+    static class Callback<I> extends Link<I, I>
+    {
+        Callback(Head<?> head)
+        {
+            super(head);
+        }
+
+        @Override
+        public void accept(I i, Throwable throwable)
+        {
+            next.accept(i, throwable);
+        }
+    }
+
+    static class EncapsulatedCallback<I> extends Callback<I>
+    {
+        final BiConsumer<? super I, Throwable> callback;
+
+        EncapsulatedCallback(Head<?> head, BiConsumer<? super I, Throwable> 
callback)
+        {
+            super(head);
+            this.callback = callback;
+        }
+
+        @Override
+        public void accept(I i, Throwable throwable)
+        {
+            super.accept(i, throwable);
+            callback.accept(i, throwable);
+        }
+    }
+
+    // either the thing we start, or the thing we do in follow-up
+    BiConsumer<? super V, Throwable> next;
+    AsyncChains(Head<?> head)
+    {
+        this.next = (BiConsumer) head;
+    }
+
+    @Override
+    public <T> AsyncChain<T> map(Function<? super V, ? extends T> mapper)
+    {
+        return add(EncapsulatedMap::new, mapper);
+    }
+
+    @Override
+    public <T> AsyncChain<T> flatMap(Function<? super V, ? extends 
AsyncChain<T>> mapper)
+    {
+        return add(EncapsulatedFlatMap::new, mapper);
+    }
+
+    @Override
+    public AsyncChain<V> addCallback(BiConsumer<? super V, Throwable> callback)
+    {
+        return add(EncapsulatedCallback::new, callback);
+    }
+
+    // can be used by transformations that want efficiency, and can directly 
extend Link, FlatMap or Callback
+    // (or perhaps some additional helper implementations that permit us to 
simply implement apply for Map and FlatMap)
+    <O, T extends AsyncChain<O> & BiConsumer<? super V, Throwable>> 
AsyncChain<O> add(Function<Head<?>, T> factory)
+    {
+        Preconditions.checkState(next instanceof Head<?>);
+        Head<?> head = (Head<?>) next;
+        T result = factory.apply(head);
+        next = result;
+        return result;
+    }
+
+    <P, O, T extends AsyncChain<O> & BiConsumer<? super V, Throwable>> 
AsyncChain<O> add(BiFunction<Head<?>, P, T> factory, P param)
+    {
+        Preconditions.checkState(next instanceof Head<?>);
+        Head<?> head = (Head<?>) next;
+        T result = factory.apply(head, param);
+        next = result;
+        return result;
+    }
+
+    private static <V> Runnable encapsulate(Callable<V> callable, BiConsumer<? 
super V, Throwable> receiver)
+    {
+        return () -> {
+            try
+            {
+                V result = callable.call();
+                receiver.accept(result, null);
+            }
+            catch (Throwable t)
+            {
+                receiver.accept(null, t);
+            }
+        };
+    }
+
+    private static Runnable encapsulate(Runnable runnable, BiConsumer<? super 
Void, Throwable> receiver)
+    {
+        return () -> {
+            try
+            {
+                runnable.run();
+                receiver.accept(null, null);
+            }
+            catch (Throwable t)
+            {
+                receiver.accept(null, t);
+            }
+        };
+    }
+
+    public static <V> AsyncChain<V> success(V success)
+    {
+        return new Immediate<>(success);
+    }
+
+    public static <V> AsyncChain<V> failure(Throwable failure)
+    {
+        return new Immediate<>(failure);
+    }
+
+    public static <V> AsyncChain<V> ofCallable(Executor executor, Callable<V> 
callable)
+    {
+        return new Head<V>()
+        {
+            @Override
+            public void begin(BiConsumer<? super V, Throwable> next)
+            {
+                executor.execute(encapsulate(callable, next));
+            }
+        };
+    }
+
+    public static AsyncChain<Void> ofRunnable(Executor executor, Runnable 
runnable)
+    {
+        return new Head<Void>()
+        {
+            @Override
+            public void begin(BiConsumer<? super Void, Throwable> callback)
+            {
+                executor.execute(AsyncChains.encapsulate(runnable, callback));
+            }
+        };
+    }
+
+    public static <V> AsyncChain<List<V>> all(List<AsyncChain<V>> chains)
+    {
+        Preconditions.checkArgument(!chains.isEmpty());
+        return new AsyncChainCombiner.All<>(chains);
+    }
+
+    public static <V> AsyncChain<V> reduce(List<AsyncChain<V>> chains, 
BiFunction<V, V, V> reducer)
+    {
+        Preconditions.checkArgument(!chains.isEmpty());
+        if (chains.size() == 1)
+            return chains.get(0);
+        if (Reduce.canAppendTo(chains.get(0), reducer))
+        {
+            AsyncChainCombiner.Reduce<V> appendTo = 
(AsyncChainCombiner.Reduce<V>) chains.get(0);
+            appendTo.addAll(chains.subList(1, chains.size()));
+            return appendTo;
+        }
+        return new Reduce<>(chains, reducer);
+    }
+
+    public static <V> AsyncChain<V> reduce(AsyncChain<V> a, AsyncChain<V> b, 
BiFunction<V, V, V> reducer)
+    {
+        if (Reduce.canAppendTo(a, reducer))
+        {
+            AsyncChainCombiner.Reduce<V> appendTo = 
(AsyncChainCombiner.Reduce<V>) a;
+            appendTo.add(b);
+            return a;
+        }
+        return new Reduce<>(Lists.newArrayList(a, b), reducer);
+    }
+
+    public static <V> V getBlocking(AsyncChain<V> chain) throws 
InterruptedException, ExecutionException
+    {
+        class Result
+        {
+            final V result;
+            final Throwable failure;
+
+            public Result(V result, Throwable failure)
+            {
+                this.result = result;
+                this.failure = failure;
+            }
+        }
+
+        AtomicReference<Result> callbackResult = new AtomicReference<>();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        chain.begin((result, failure) -> {
+            callbackResult.set(new Result(result, failure));
+            latch.countDown();
+        });
+
+        latch.await();
+        Result result = callbackResult.get();
+        if (result.failure == null) return result.result;
+        else throw new ExecutionException(result.failure);
+    }

Review Comment:
   can simplify to be
   
   ```
   CompletableFuture<V> future = new CompletableFuture<>();
           chain.begin((result, failure) -> {
               if (failure != null) future.completeExceptionally(failure);
               else future.complete(result);
           });
           return future.get();
   ```



##########
accord-core/src/main/java/accord/utils/async/AsyncCallbacks.java:
##########
@@ -0,0 +1,45 @@
+package accord.utils.async;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executor;
+import java.util.function.BiConsumer;
+
+public class AsyncCallbacks
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(AsyncCallbacks.class);
+
+    private static final BiConsumer<Object, Throwable> NOOP = (unused, 
failure) -> {
+        if (failure != null)
+            logger.error("Exception received by noop callback", failure);
+    };
+
+    public static <T> BiConsumer<? super T, Throwable> noop()
+    {
+        return NOOP;
+    }
+
+    public static <T> BiConsumer<? super T, Throwable> inExecutor(BiConsumer<? 
super T, Throwable> callback, Executor executor)

Review Comment:
   I 100% acknowledge that this wouldn't be consistent with 
`org.apache.cassandra.utils.concurrent.Future`



##########
accord-core/src/main/java/accord/utils/async/AsyncResult.java:
##########
@@ -0,0 +1,86 @@
+package accord.utils.async;
+
+import java.util.concurrent.Executor;
+import java.util.function.BiConsumer;
+
+/**
+ * Handle for async computations that supports multiple listeners and 
registering
+ * listeners after the computation has started
+ */
+public interface AsyncResult<V>
+{
+    void addCallback(BiConsumer<? super V, Throwable> callback);
+
+    default void addCallback(Runnable runnable)
+    {
+        addCallback((unused, failure) -> {
+            if (failure == null) runnable.run();
+            else throw new RuntimeException(failure);
+        });

Review Comment:
   should do the refactor I mention in `AsyncCallbacks`, this would let us do 
the following
   
   ```
   default void addCallback(Runnable runnable)
   {
       addCallback(AsyncCallbacks.wrap(runnable));
   }
   ```



##########
accord-core/src/main/java/accord/utils/async/AsyncChainCombiner.java:
##########
@@ -0,0 +1,150 @@
+package accord.utils.async;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+
+abstract class AsyncChainCombiner<I, O> extends AsyncChains.Head<O>
+{
+    private static final AtomicIntegerFieldUpdater<AsyncChainCombiner> 
REMAINING = AtomicIntegerFieldUpdater.newUpdater(AsyncChainCombiner.class, 
"remaining");
+    private volatile Object state;
+    private volatile BiConsumer<? super O, Throwable> callback;
+    private volatile int remaining;
+
+    protected AsyncChainCombiner(List<AsyncChain<I>> inputs)
+    {
+        Preconditions.checkArgument(!inputs.isEmpty());
+        this.state = inputs;
+    }
+
+    private List<AsyncChain<? extends I>> inputs()
+    {
+        Object current = state;
+        Preconditions.checkState(current instanceof List);
+        return (List<AsyncChain<? extends I>>) current;
+    }
+
+    private I[] results()
+    {
+        Object current = state;
+        Preconditions.checkState(current instanceof Object[]);
+        return (I[]) current;
+    }
+
+    void add(AsyncChain<I> chain)
+    {
+        inputs().add(chain);
+    }
+
+    void addAll(List<AsyncChain<I>> chains)
+    {
+        inputs().addAll(chains);
+    }
+
+    int size()
+    {
+        Object current = state;
+        if (current instanceof List)
+            return ((List) current).size();
+        if (current instanceof Object[])
+            return ((Object[]) current).length;
+        throw new IllegalStateException();
+    }
+
+    abstract void complete(I[] results, BiConsumer<? super O, Throwable> 
callback);
+
+    private void callback(int idx, I result, Throwable throwable)
+    {
+        int current = remaining;
+        if (current == 0)
+            return;
+
+        if (throwable != null && REMAINING.compareAndSet(this, current, 0))

Review Comment:
   Think it should be the following
   
   ```
   if (throwable != null)
           {
               if (markFailed())
                   callback.accept(null, throwable);
   
               return;
           }
   ...
   private boolean markFailed()
       {
           int current;
           do
           {
               current = remaining;
               if (current == 0)
                   return false;
           }
           while (!REMAINING.compareAndSet(this, current, 0));
           return true;
       }
   ```
   
   This should also handle the case that multiple exceptions are seen at the 
same time.



##########
accord-core/src/main/java/accord/utils/async/AsyncResults.java:
##########
@@ -0,0 +1,309 @@
+package accord.utils.async;
+
+import com.google.common.base.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+
+public class AsyncResults
+{
+    private AsyncResults() {}
+
+    private static class Result<V>
+    {
+        final V value;
+        final Throwable failure;
+
+        public Result(V value, Throwable failure)
+        {
+            this.value = value;
+            this.failure = failure;
+        }
+    }
+
+    static class AbstractResult<V> implements AsyncResult<V>
+    {
+        private static final AtomicReferenceFieldUpdater<AbstractResult, 
Object> STATE = AtomicReferenceFieldUpdater.newUpdater(AbstractResult.class, 
Object.class, "state");
+
+        private volatile Object state;
+
+        private static class Listener<V>
+        {
+            final BiConsumer<? super V, Throwable> callback;
+            Listener<V> next;
+
+            public Listener(BiConsumer<? super V, Throwable> callback)
+            {
+                this.callback = callback;
+            }
+        }
+
+        private void notify(Listener<V> listener, Result<V> result)
+        {
+            while (listener != null)
+            {
+                listener.callback.accept(result.value, result.failure);
+                listener = listener.next;
+            }
+        }
+
+        boolean trySetResult(Result<V> result)
+        {
+            while (true)
+            {
+                Object current = state;
+                if (current instanceof Result)
+                    return false;
+                Listener<V> listener = (Listener<V>) current;
+                if (STATE.compareAndSet(this, current, result))
+                {
+                    notify(listener, result);
+                    return true;
+                }
+            }
+        }
+
+        boolean trySetResult(V result, Throwable failure)
+        {
+            return trySetResult(new Result<>(result, failure));
+        }
+
+        void setResult(Result<V> result)
+        {
+            if (!trySetResult(result))
+                throw new IllegalStateException("Result has already been set 
on " + this);
+        }
+
+        void setResult(V result, Throwable failure)
+        {
+            if (!trySetResult(result, failure))
+                throw new IllegalStateException("Result has already been set 
on " + this);
+        }
+
+        @Override
+        public void addCallback(BiConsumer<? super V, Throwable> callback)
+        {
+            Listener<V> listener = null;
+            while (true)
+            {
+                Object current = state;
+                if (current instanceof Result)
+                {
+                    Result<V> result = (Result<V>) current;
+                    callback.accept(result.value, result.failure);
+                    return;
+                }
+                if (listener == null)
+                    listener = new Listener<>(callback);
+
+                listener.next = (Listener<V>) current;
+                if (STATE.compareAndSet(this, current, listener))
+                    return;
+            }
+        }
+
+        @Override
+        public boolean isDone()
+        {
+            return state instanceof Result;
+        }
+
+        @Override
+        public boolean isSuccess()
+        {
+            Object current = state;
+            return current instanceof Result && ((Result) current).failure == 
null;
+        }
+    }
+
+    static class Chain<V> extends AbstractResult<V>
+    {
+        public Chain(AsyncChain<V> chain)
+        {
+            chain.begin(this::setResult);
+        }
+    }
+
+    public static class Settable<V> extends AbstractResult<V> implements 
AsyncResult.Settable<V>
+    {
+
+        @Override
+        public boolean trySuccess(V value)
+        {
+            return trySetResult(value, null);
+        }
+
+        @Override
+        public boolean tryFailure(Throwable throwable)
+        {
+            return trySetResult(null, throwable);
+        }
+    }
+
+    static class Immediate<V> implements AsyncResult<V>
+    {
+        private final V value;
+        private final Throwable failure;
+
+        Immediate(V value)
+        {
+            this.value = value;
+            this.failure = null;
+        }
+
+        Immediate(Throwable failure)
+        {
+            this.value = null;
+            this.failure = failure;
+        }
+
+        @Override
+        public void addCallback(BiConsumer<? super V, Throwable> callback)
+        {
+            callback.accept(value, failure);
+        }
+
+        @Override
+        public boolean isDone()
+        {
+            return true;
+        }
+
+        @Override
+        public boolean isSuccess()
+        {
+            return failure == null;
+        }
+    }
+
+    /**
+     * Creates an AsyncResult for the given chain. This calls begin on the 
supplied chain
+     */
+    public static <V> AsyncResult<V> forChain(AsyncChain<V> chain)
+    {
+        return new Chain<>(chain);
+    }
+
+    public static <V> AsyncResult<V> success(V value)
+    {
+        return new Immediate<>(value);
+    }
+
+    public static <V> AsyncResult<V> failure(Throwable failure)
+    {
+        return new Immediate<>(failure);
+    }
+
+    public static <V> AsyncResult<V> ofCallable(Executor executor, Callable<V> 
callable)
+    {
+        Settable<V> result = new Settable<V>();
+        executor.execute(() -> {
+            try
+            {
+                result.trySuccess(callable.call());
+            }
+            catch (Exception e)
+            {
+                result.tryFailure(e);
+            }
+        });
+        return result;
+    }
+
+    public static AsyncResult<Void> ofRunnable(Executor executor, Runnable 
runnable)
+    {
+        Settable<Void> result = new Settable<Void>();
+        executor.execute(() -> {
+            try
+            {
+                runnable.run();
+                result.trySuccess(null);
+            }
+            catch (Exception e)
+            {
+                result.tryFailure(e);
+            }
+        });
+        return result;
+    }
+
+    public static <V> AsyncResult.Settable<V> settable()
+    {
+        return new Settable<>();
+    }
+
+    private static <V> List<AsyncChain<V>> toChains(List<AsyncResult<V>> 
results)
+    {
+        List<AsyncChain<V>> chains = new ArrayList<>(results.size());
+        for (int i=0,mi=results.size(); i<mi; i++)
+            chains.add(results.get(i).toChain());
+        return chains;
+    }
+
+    public static <V> AsyncChain<List<V>> all(List<AsyncResult<V>> results)
+    {
+        Preconditions.checkArgument(!results.isEmpty());
+        return new AsyncChainCombiner.All<>(toChains(results));
+    }
+
+    public static <V> AsyncChain<V> reduce(List<AsyncResult<V>> results, 
BiFunction<V, V, V> reducer)
+    {
+        Preconditions.checkArgument(!results.isEmpty());
+        if (results.size() == 1)
+            return results.get(0).toChain();
+        return new AsyncChainCombiner.Reduce<>(toChains(results), reducer);
+    }
+
+    public static <V> V getBlocking(AsyncResult<V> asyncResult) throws 
InterruptedException, ExecutionException
+    {
+        AtomicReference<Result<V>> callbackResult = new AtomicReference<>();
+        CountDownLatch latch = new CountDownLatch(1);

Review Comment:
   I gave feedback in `AsyncChains` to switch to `CompletableFuture` to 
simplify the code, guess it would have the same issue with simulator...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to