bdeggleston commented on code in PR #17:
URL: https://github.com/apache/cassandra-accord/pull/17#discussion_r1052407260


##########
accord-core/src/main/java/accord/utils/async/AsyncChains.java:
##########
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.utils.async;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import static accord.utils.async.AsyncChainCombiner.Reduce;
+
+public abstract class AsyncChains<V> implements AsyncChain<V>
+{
+    static class Immediate<V> implements AsyncChain<V>
+    {
+        static class FailureHolder
+        {
+            final Throwable cause;
+            FailureHolder(Throwable cause)
+            {
+                this.cause = cause;
+            }
+        }
+
+        final private Object value;
+        private Immediate(V success) { this.value = success; }
+        private Immediate(Throwable failure) { this.value = new 
FailureHolder(failure); }
+
+        @Override
+        public <T> AsyncChain<T> map(Function<? super V, ? extends T> mapper)
+        {
+            if (value != null && value.getClass() == FailureHolder.class)
+                return (AsyncChain<T>) this;
+            return new Immediate<>(mapper.apply((V) value));
+        }
+
+        @Override
+        public <T> AsyncChain<T> flatMap(Function<? super V, ? extends 
AsyncChain<T>> mapper)
+        {
+            if (value != null && value.getClass() == FailureHolder.class)
+                return (AsyncChain<T>) this;
+            return mapper.apply((V) value);
+        }
+
+        @Override
+        public AsyncChain<V> addCallback(BiConsumer<? super V, Throwable> 
callback)
+        {
+            if (value == null || value.getClass() != FailureHolder.class)
+                callback.accept((V) value, null);
+            else
+                callback.accept(null, ((FailureHolder)value).cause);
+            return this;
+        }
+
+        @Override
+        public void begin(BiConsumer<? super V, Throwable> callback)
+        {
+            addCallback(callback);
+        }
+    }
+
+    public abstract static class Head<V> extends AsyncChains<V> implements 
BiConsumer<V, Throwable>
+    {
+        protected Head()
+        {
+            super(null);
+            next = this;
+        }
+
+        void begin()
+        {
+            begin(next);
+        }
+
+        @Override
+        public void accept(V v, Throwable throwable)
+        {
+            // we implement here just to simplify logic a little
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    static abstract class Link<I, O> extends AsyncChains<O> implements 
BiConsumer<I, Throwable>
+    {
+        protected Link(Head<?> head)
+        {
+            super(head);
+        }
+
+        @Override
+        public void begin(BiConsumer<? super O, Throwable> callback)
+        {
+            Preconditions.checkArgument(!(callback instanceof 
AsyncChains.Head));
+            Preconditions.checkState(next instanceof AsyncChains.Head);
+            Head<?> head = (Head<?>) next;
+            next = callback;
+            head.begin();
+        }
+    }
+
+    public static abstract class Map<I, O> extends Link<I, O> implements 
Function<I, O>
+    {
+        Map(Head<?> head)
+        {
+            super(head);
+        }
+
+        @Override
+        public void accept(I i, Throwable throwable)
+        {
+            if (throwable != null) next.accept(null, throwable);
+            else next.accept(apply(i), null);
+        }
+    }
+
+    static class EncapsulatedMap<I, O> extends Map<I, O>
+    {
+        final Function<? super I, ? extends O> map;
+
+        EncapsulatedMap(Head<?> head, Function<? super I, ? extends O> map)
+        {
+            super(head);
+            this.map = map;
+        }
+
+        @Override
+        public O apply(I i)
+        {
+            return map.apply(i);
+        }
+    }
+
+    public static abstract class FlatMap<I, O> extends Link<I, O> implements 
Function<I, AsyncChain<O>>
+    {
+        FlatMap(Head<?> head)
+        {
+            super(head);
+        }
+
+        @Override
+        public void accept(I i, Throwable throwable)
+        {
+            if (throwable != null) next.accept(null, throwable);
+            else apply(i).begin(next);
+        }
+    }
+
+    static class EncapsulatedFlatMap<I, O> extends FlatMap<I, O>
+    {
+        final Function<? super I, ? extends AsyncChain<O>> map;
+
+        EncapsulatedFlatMap(Head<?> head, Function<? super I, ? extends 
AsyncChain<O>> map)
+        {
+            super(head);
+            this.map = map;
+        }
+
+        @Override
+        public AsyncChain<O> apply(I i)
+        {
+            return map.apply(i);
+        }
+    }
+
+    // if extending Callback, be sure to invoke super.accept()
+    static class Callback<I> extends Link<I, I>
+    {
+        Callback(Head<?> head)
+        {
+            super(head);
+        }
+
+        @Override
+        public void accept(I i, Throwable throwable)
+        {
+            next.accept(i, throwable);
+        }
+    }
+
+    static class EncapsulatedCallback<I> extends Callback<I>
+    {
+        final BiConsumer<? super I, Throwable> callback;
+
+        EncapsulatedCallback(Head<?> head, BiConsumer<? super I, Throwable> 
callback)
+        {
+            super(head);
+            this.callback = callback;
+        }
+
+        @Override
+        public void accept(I i, Throwable throwable)
+        {
+            super.accept(i, throwable);
+            callback.accept(i, throwable);
+        }
+    }
+
+    // either the thing we start, or the thing we do in follow-up
+    BiConsumer<? super V, Throwable> next;
+    AsyncChains(Head<?> head)
+    {
+        this.next = (BiConsumer) head;
+    }
+
+    @Override
+    public <T> AsyncChain<T> map(Function<? super V, ? extends T> mapper)
+    {
+        return add(EncapsulatedMap::new, mapper);
+    }
+
+    @Override
+    public <T> AsyncChain<T> flatMap(Function<? super V, ? extends 
AsyncChain<T>> mapper)
+    {
+        return add(EncapsulatedFlatMap::new, mapper);
+    }
+
+    @Override
+    public AsyncChain<V> addCallback(BiConsumer<? super V, Throwable> callback)
+    {
+        return add(EncapsulatedCallback::new, callback);
+    }
+
+    // can be used by transformations that want efficiency, and can directly 
extend Link, FlatMap or Callback
+    // (or perhaps some additional helper implementations that permit us to 
simply implement apply for Map and FlatMap)
+    <O, T extends AsyncChain<O> & BiConsumer<? super V, Throwable>> 
AsyncChain<O> add(Function<Head<?>, T> factory)
+    {
+        Preconditions.checkState(next instanceof Head<?>);
+        Head<?> head = (Head<?>) next;
+        T result = factory.apply(head);
+        next = result;
+        return result;
+    }
+
+    <P, O, T extends AsyncChain<O> & BiConsumer<? super V, Throwable>> 
AsyncChain<O> add(BiFunction<Head<?>, P, T> factory, P param)
+    {
+        Preconditions.checkState(next instanceof Head<?>);
+        Head<?> head = (Head<?>) next;
+        T result = factory.apply(head, param);
+        next = result;
+        return result;
+    }
+
+    private static <V> Runnable encapsulate(Callable<V> callable, BiConsumer<? 
super V, Throwable> receiver)
+    {
+        return () -> {
+            try
+            {
+                V result = callable.call();
+                receiver.accept(result, null);
+            }
+            catch (Throwable t)
+            {
+                receiver.accept(null, t);
+            }
+        };
+    }
+
+    private static Runnable encapsulate(Runnable runnable, BiConsumer<? super 
Void, Throwable> receiver)
+    {
+        return () -> {
+            try
+            {
+                runnable.run();
+                receiver.accept(null, null);
+            }
+            catch (Throwable t)
+            {
+                receiver.accept(null, t);
+            }
+        };
+    }
+
+    public static <V> AsyncChain<V> success(V success)
+    {
+        return new Immediate<>(success);
+    }
+
+    public static <V> AsyncChain<V> failure(Throwable failure)
+    {
+        return new Immediate<>(failure);
+    }
+
+    public static <V> AsyncChain<V> ofCallable(Executor executor, Callable<V> 
callable)
+    {
+        return new Head<V>()
+        {
+            @Override
+            public void begin(BiConsumer<? super V, Throwable> next)
+            {
+                executor.execute(encapsulate(callable, next));
+            }
+        };
+    }
+
+    public static AsyncChain<Void> ofRunnable(Executor executor, Runnable 
runnable)
+    {
+        return new Head<Void>()
+        {
+            @Override
+            public void begin(BiConsumer<? super Void, Throwable> callback)
+            {
+                executor.execute(AsyncChains.encapsulate(runnable, callback));
+            }
+        };
+    }
+
+    public static <V> AsyncChain<List<V>> all(List<AsyncChain<V>> chains)
+    {
+        Preconditions.checkArgument(!chains.isEmpty());
+        return new AsyncChainCombiner.All<>(chains);
+    }
+
+    public static <V> AsyncChain<V> reduce(List<AsyncChain<V>> chains, 
BiFunction<V, V, V> reducer)
+    {
+        Preconditions.checkArgument(!chains.isEmpty());
+        if (chains.size() == 1)
+            return chains.get(0);

Review Comment:
   Right, this is about the same as `Stream#reduce`, where the reducer is only 
called if there is more than one result to feed it. The `Optional` returned by 
`Stream#reduce` is to support empty streams. This is explicitly not supported 
by this reduce method as you can see by the not-empty check. Everywhere this is 
currently used, it would be an error to not have at least one chain to reduce. 
I wouldn't be opposed to adding a zero parameter if that changes though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to