dcapwell commented on code in PR #10:
URL: https://github.com/apache/cassandra-accord/pull/10#discussion_r993936450


##########
accord-core/src/main/java/accord/local/Command.java:
##########
@@ -380,119 +402,155 @@ public void onChange(Command command)
             case Executed:
             case Applied:
             case Invalidated:
-                if (waitingOnApply != null)
+                if (isUnableToApply())
                 {
                     updatePredecessor(command);
-                    if (waitingOnCommit != null)
+                    if (isWaitingOnCommit())
                     {
-                        if (waitingOnCommit.remove(command.txnId) != null && 
waitingOnCommit.isEmpty())
-                            waitingOnCommit = null;
+                        removeWaitingOnCommit(command);
                     }
-                    if (waitingOnCommit == null && waitingOnApply.isEmpty())
-                        waitingOnApply = null;
                 }
                 else
                 {
                     command.removeListener(this);
                 }
-                maybeExecute(true);
+                maybeExecute(false);
                 break;
         }
     }
 
-    private void maybeExecute(boolean notifyListeners)
+    @Override
+    public void onChange(Command command)
     {
-        if (status != Committed && status != Executed)
-            return;
+        onChangeInternal(command);
+    }
 
-        if (waitingOnApply != null)
+    protected void postApply()
+    {
+        logger.trace("{} applied, setting status to Applied and notifying 
listeners", txnId());
+        status(Applied);
+        notifyListeners();
+    }
+
+    private static Function<CommandStore, Void> callPostApply(TxnId txnId)
+    {
+        return commandStore -> {
+            commandStore.command(txnId).postApply();
+            return null;
+        };
+    }
+
+    protected Future<Void> apply()
+    {
+        // important: we can't include a reference to *this* in the lambda, 
since the C* implementation may evict
+        // the command instance from memory between now and the write 
completing (and post apply being called)
+        return writes().apply(commandStore()).flatMap(unused ->
+            commandStore().process(this, callPostApply(txnId()))
+        );
+    }
+
+    public Future<Data> read(Keys scope)

Review Comment:
   should we remove `Keys scope`?  The previous usage was just to populate the 
`ReadFuture.scope` and was ignored



##########
accord-core/src/main/java/accord/txn/Txn.java:
##########
@@ -18,93 +18,140 @@
 
 package accord.txn;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Objects;
 
 import accord.api.*;
 import accord.local.*;
 import accord.primitives.Keys;
 import accord.primitives.Timestamp;
+import accord.utils.ReducingFuture;
+import org.apache.cassandra.utils.concurrent.Future;
 
-public class Txn
-{
-    enum Kind { READ, WRITE }
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
-    final Kind kind;
-    public final Keys keys;
-    public final Read read;
-    public final Query query;
-    public final Update update;
+public abstract class Txn
+{
+    public enum Kind { READ, WRITE }
 
-    public Txn(Keys keys, Read read, Query query)
+    public static class InMemory extends Txn
     {
-        this.kind = Kind.READ;
-        this.keys = keys;
-        this.read = read;
-        this.query = query;
-        this.update = null;
+        private final Kind kind;
+        private final Keys keys;
+        private final Read read;
+        private final Query query;
+        private final Update update;
+
+        public InMemory(@Nonnull Keys keys, @Nonnull Read read, @Nonnull Query 
query)
+        {
+            this.kind = Kind.READ;
+            this.keys = keys;
+            this.read = read;
+            this.query = query;
+            this.update = null;
+        }
+
+        public InMemory(@Nonnull Keys keys, @Nonnull Read read, @Nonnull Query 
query, @Nullable Update update)
+        {
+            this.kind = Kind.WRITE;
+            this.keys = keys;
+            this.read = read;
+            this.update = update;
+            this.query = query;
+        }
+
+        @Override
+        public Kind kind()
+        {
+            return kind;
+        }
+
+        @Override
+        public Keys keys()
+        {
+            return keys;
+        }
+
+        @Override
+        public Read read()
+        {
+            return read;
+        }
+
+        @Override
+        public Query query()
+        {
+            return query;
+        }
+
+        @Override
+        public Update update()
+        {
+            return update;
+        }
     }
 
-    public Txn(Keys keys, Read read, Query query, Update update)
-    {
-        this.kind = Kind.WRITE;
-        this.keys = keys;
-        this.read = read;
-        this.update = update;
-        this.query = query;
-    }
+    public abstract @Nonnull Kind kind();
+    public abstract @Nonnull Keys keys();
+    public abstract @Nonnull Read read();
+    public abstract @Nonnull Query query();
+    public abstract @Nullable Update update();
 
     @Override
     public boolean equals(Object o)
     {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         Txn txn = (Txn) o;
-        return kind == txn.kind && keys.equals(txn.keys) && 
read.equals(txn.read) && query.equals(txn.query) && Objects.equals(update, 
txn.update);
+        return kind() == txn.kind()
+                && keys().equals(txn.keys())
+                && read().equals(txn.read())
+                && query().equals(txn.query())
+                && Objects.equals(update(), txn.update());
     }
 
     @Override
     public int hashCode()
     {
-        return Objects.hash(kind, keys, read, query, update);
+        return Objects.hash(kind(), keys(), read(), query(), update());
     }
 
     public boolean isWrite()
     {
-        return kind == Kind.WRITE;
+        return kind() == Kind.WRITE;
     }
 
     public Result result(Data data)
     {
-        return query.compute(data, read, update);
+        return query().compute(data, read(), update());
     }
 
     public Writes execute(Timestamp executeAt, Data data)
     {
-        if (update == null)
-            return new Writes(executeAt, keys, null);
+        if (update() == null)
+            return new Writes(executeAt, keys(), null);
 
-        return new Writes(executeAt, keys, update.apply(data));
-    }
-
-    public Keys keys()
-    {
-        return keys;
+        return new Writes(executeAt, keys(), update().apply(data));
     }
 
     public String toString()
     {
-        return "{read:" + read.toString() + (update != null ? ", update:" + 
update : "") + '}';
+        return "{read:" + read().toString() + (update() != null ? ", update:" 
+ update() : "") + '}';
     }
 
-    public Data read(Command command, Keys keys)
+    public Future<Data> read(Command command)
     {
-        return 
keys.foldl(command.commandStore.ranges().at(command.executeAt().epoch), (index, 
key, accumulate) -> {
-            CommandStore commandStore = command.commandStore;
+        List<Future<Data>> futures = 
keys().foldl(command.commandStore().ranges().at(command.executeAt().epoch), 
(index, key, accumulate) -> {
+            CommandStore commandStore = command.commandStore();
             if (!commandStore.hashIntersects(key))
                 return accumulate;
 
-            Data result = read.read(key, command.executeAt(), 
commandStore.store());
-            return accumulate != null ? accumulate.merge(result) : result;
-        }, null);
+            Future<Data> result = read().read(key, isWrite(), 
command.commandStore(), command.executeAt(), commandStore.store());
+            accumulate.add(result);
+            return accumulate;
+        }, new ArrayList<>());
+        return ReducingFuture.reduce(futures, (d1, d2) -> d1.merge(d2));

Review Comment:
   can we do `return ReducingFuture.reduce(futures, Data::merge);` instead?



##########
accord-core/src/main/java/accord/utils/ReducingFuture.java:
##########
@@ -0,0 +1,56 @@
+package accord.utils;
+
+import com.google.common.base.Preconditions;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.function.BiFunction;
+
+public class ReducingFuture<V> extends AsyncPromise<V>
+{
+    private static final AtomicIntegerFieldUpdater<ReducingFuture> 
PENDING_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ReducingFuture.class, 
"pending");
+    private final List<? extends Future<V>> futures;
+    private final BiFunction<V, V, V> reducer;
+    private volatile int pending;
+
+    protected ReducingFuture(List<? extends Future<V>> futures, BiFunction<V, 
V, V> reducer)
+    {
+        this.futures = futures;
+        this.reducer = reducer;
+        this.pending = futures.size();
+        if (futures.size() == 0)
+            trySuccess(null);
+        futures.forEach(f -> f.addListener(this::operationComplete));
+    }
+
+    private <F extends io.netty.util.concurrent.Future<?>> void 
operationComplete(F future) throws Exception
+    {
+        if (isDone())
+            return;
+
+        if (!future.isSuccess())
+        {
+            tryFailure(future.cause());
+        }
+        else if (PENDING_UPDATER.decrementAndGet(this) == 0)
+        {
+            V result = futures.get(0).getNow();
+            for (int i=1, mi=futures.size(); i<mi; i++)
+                result = reducer.apply(result, futures.get(i).getNow());
+
+            trySuccess(result);
+        }
+    }
+
+    public static <T> Future<T> reduce(List<? extends Future<T>> futures, 
BiFunction<T, T, T> reducer)
+    {
+        Preconditions.checkArgument(!futures.isEmpty());

Review Comment:
   I know Apache Cassandra doesn't provide messages often, but I find that 
makes life much harder... can we have a simple message like "future list is 
empty"?



##########
accord-core/src/main/java/accord/utils/ReducingFuture.java:
##########
@@ -0,0 +1,56 @@
+package accord.utils;
+
+import com.google.common.base.Preconditions;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.function.BiFunction;
+
+public class ReducingFuture<V> extends AsyncPromise<V>
+{
+    private static final AtomicIntegerFieldUpdater<ReducingFuture> 
PENDING_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ReducingFuture.class, 
"pending");
+    private final List<? extends Future<V>> futures;
+    private final BiFunction<V, V, V> reducer;
+    private volatile int pending;
+
+    protected ReducingFuture(List<? extends Future<V>> futures, BiFunction<V, 
V, V> reducer)

Review Comment:
   can we make this private?



##########
accord-core/src/main/java/accord/utils/ReducingFuture.java:
##########
@@ -0,0 +1,57 @@
+package accord.utils;
+
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.ImmediateFuture;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.function.BiFunction;
+
+public class ReducingFuture<V> extends AsyncPromise<V>
+{
+    private static final AtomicIntegerFieldUpdater<ReducingFuture> 
PENDING_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ReducingFuture.class, 
"pending");
+    private final List<? extends Future<V>> futures;
+    private final BiFunction<V, V, V> reducer;
+    private volatile int pending;
+
+    protected ReducingFuture(List<? extends Future<V>> futures, BiFunction<V, 
V, V> reducer)
+    {
+        this.futures = futures;
+        this.reducer = reducer;
+        this.pending = futures.size();
+        if (futures.size() == 0)
+            trySuccess(null);

Review Comment:
   The constructor is no longer used and `reduce` prevents empty, so we can 
remove the `trySuccess(null);`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to