belliottsmith commented on code in PR #17:
URL: https://github.com/apache/cassandra-accord/pull/17#discussion_r1056545960


##########
accord-core/src/main/java/accord/utils/async/AsyncResults.java:
##########
@@ -0,0 +1,309 @@
+package accord.utils.async;
+
+import com.google.common.base.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+
+public class AsyncResults
+{
+    private AsyncResults() {}
+
+    private static class Result<V>
+    {
+        final V value;
+        final Throwable failure;
+
+        public Result(V value, Throwable failure)
+        {
+            this.value = value;
+            this.failure = failure;
+        }
+    }
+
+    static class AbstractResult<V> implements AsyncResult<V>
+    {
+        private static final AtomicReferenceFieldUpdater<AbstractResult, 
Object> STATE = AtomicReferenceFieldUpdater.newUpdater(AbstractResult.class, 
Object.class, "state");
+
+        private volatile Object state;
+
+        private static class Listener<V>
+        {
+            final BiConsumer<? super V, Throwable> callback;
+            Listener<V> next;
+
+            public Listener(BiConsumer<? super V, Throwable> callback)
+            {
+                this.callback = callback;
+            }
+        }
+
+        private void notify(Listener<V> listener, Result<V> result)
+        {
+            while (listener != null)
+            {
+                listener.callback.accept(result.value, result.failure);
+                listener = listener.next;
+            }
+        }
+
+        boolean trySetResult(Result<V> result)
+        {
+            while (true)
+            {
+                Object current = state;
+                if (current instanceof Result)
+                    return false;
+                Listener<V> listener = (Listener<V>) current;
+                if (STATE.compareAndSet(this, current, result))
+                {
+                    notify(listener, result);
+                    return true;
+                }
+            }
+        }
+
+        boolean trySetResult(V result, Throwable failure)
+        {
+            return trySetResult(new Result<>(result, failure));
+        }
+
+        void setResult(Result<V> result)
+        {
+            if (!trySetResult(result))
+                throw new IllegalStateException("Result has already been set 
on " + this);
+        }
+
+        void setResult(V result, Throwable failure)
+        {
+            if (!trySetResult(result, failure))
+                throw new IllegalStateException("Result has already been set 
on " + this);
+        }
+
+        @Override
+        public void addCallback(BiConsumer<? super V, Throwable> callback)
+        {
+            Listener<V> listener = null;
+            while (true)
+            {
+                Object current = state;
+                if (current instanceof Result)
+                {
+                    Result<V> result = (Result<V>) current;
+                    callback.accept(result.value, result.failure);
+                    return;
+                }
+                if (listener == null)
+                    listener = new Listener<>(callback);
+
+                listener.next = (Listener<V>) current;
+                if (STATE.compareAndSet(this, current, listener))
+                    return;
+            }
+        }
+
+        @Override
+        public boolean isDone()
+        {
+            return state instanceof Result;
+        }
+
+        @Override
+        public boolean isSuccess()
+        {
+            Object current = state;
+            return current instanceof Result && ((Result) current).failure == 
null;
+        }
+    }
+
+    static class Chain<V> extends AbstractResult<V>
+    {
+        public Chain(AsyncChain<V> chain)
+        {
+            chain.begin(this::setResult);
+        }
+    }
+
+    public static class Settable<V> extends AbstractResult<V> implements 
AsyncResult.Settable<V>
+    {
+
+        @Override
+        public boolean trySuccess(V value)
+        {
+            return trySetResult(value, null);
+        }
+
+        @Override
+        public boolean tryFailure(Throwable throwable)
+        {
+            return trySetResult(null, throwable);
+        }
+    }
+
+    static class Immediate<V> implements AsyncResult<V>
+    {
+        private final V value;
+        private final Throwable failure;
+
+        Immediate(V value)
+        {
+            this.value = value;
+            this.failure = null;
+        }
+
+        Immediate(Throwable failure)
+        {
+            this.value = null;
+            this.failure = failure;
+        }
+
+        @Override
+        public void addCallback(BiConsumer<? super V, Throwable> callback)
+        {
+            callback.accept(value, failure);
+        }
+
+        @Override
+        public boolean isDone()
+        {
+            return true;
+        }
+
+        @Override
+        public boolean isSuccess()
+        {
+            return failure == null;
+        }
+    }
+
+    /**
+     * Creates an AsyncResult for the given chain. This calls begin on the 
supplied chain
+     */
+    public static <V> AsyncResult<V> forChain(AsyncChain<V> chain)
+    {
+        return new Chain<>(chain);
+    }
+
+    public static <V> AsyncResult<V> success(V value)
+    {
+        return new Immediate<>(value);
+    }
+
+    public static <V> AsyncResult<V> failure(Throwable failure)
+    {
+        return new Immediate<>(failure);
+    }
+
+    public static <V> AsyncResult<V> ofCallable(Executor executor, Callable<V> 
callable)
+    {
+        Settable<V> result = new Settable<V>();
+        executor.execute(() -> {
+            try
+            {
+                result.trySuccess(callable.call());
+            }
+            catch (Exception e)
+            {
+                result.tryFailure(e);
+            }
+        });
+        return result;
+    }
+
+    public static AsyncResult<Void> ofRunnable(Executor executor, Runnable 
runnable)
+    {
+        Settable<Void> result = new Settable<Void>();
+        executor.execute(() -> {
+            try
+            {
+                runnable.run();
+                result.trySuccess(null);
+            }
+            catch (Exception e)
+            {
+                result.tryFailure(e);
+            }
+        });
+        return result;
+    }

Review Comment:
   Agreed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to