aweisberg commented on code in PR #80:
URL: https://github.com/apache/cassandra-accord/pull/80#discussion_r1490159249


##########
accord-core/src/main/java/accord/coordinate/CoordinateEphemeralRead.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import accord.api.Result;
+import accord.coordinate.tracking.QuorumTracker;
+import accord.local.CommandStore;
+import accord.local.Node;
+import accord.messages.Callback;
+import accord.messages.GetEphemeralReadDeps;
+import accord.messages.GetEphemeralReadDeps.GetEphemeralReadDepsOk;
+import accord.primitives.Deps;
+import accord.primitives.FullRoute;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
+import accord.topology.Topologies;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
+
+import static accord.coordinate.tracking.RequestStatus.Failed;
+import static accord.coordinate.tracking.RequestStatus.Success;
+
+public class CoordinateEphemeralRead extends 
AbstractCoordinatePreAccept<Result, GetEphemeralReadDepsOk>

Review Comment:
   Needs a comment explaining how an ephemeral read is coordinated and what it 
does differently.
   
   When we talked about it some key differences that came our were:
   1. The ephemeral read isn't witnessed by other transactions and doesn't 
create persistent transaction state
   2. Only does the preaccept round (and extra rounds), not accept because...
   3. Doesn't pick an `executeAt` per se it just uses its original transaction 
id.
   4. It waits for the deps it finds to execute and then executes
   5. It does not run Apply since no one is waiting for apply and there is no 
persistent state



##########
accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java:
##########
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiConsumer;
+
+import accord.coordinate.tracking.QuorumTracker;
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.messages.Callback;
+import accord.primitives.FullRoute;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
+import accord.topology.Topologies;
+import accord.utils.Invariants;
+import accord.utils.async.AsyncResults.SettableResult;
+
+import static accord.coordinate.tracking.RequestStatus.Failed;
+import static accord.coordinate.tracking.RequestStatus.Success;
+
+/**
+ * Abstract parent class for implementing preaccept-like operations where we 
may need to fetch additional replies
+ * from future epochs.
+ */
+abstract class AbstractCoordinatePreAccept<T, R> extends SettableResult<T> 
implements Callback<R>, BiConsumer<T, Throwable>
+{
+    class ExtraEpochs implements Callback<R>
+    {
+        final QuorumTracker tracker;
+        private boolean extraRoundIsDone;
+
+        ExtraEpochs(long fromEpoch, long toEpoch)
+        {
+            Topologies topologies = node.topology().preciseEpochs(route, 
fromEpoch, toEpoch);
+            this.tracker = new QuorumTracker(topologies);
+        }
+
+        void start()
+        {
+            // TODO (desired, efficiency): consider sending only to electorate 
of most recent topology (as only these PreAccept votes matter)
+            // note that we must send to all replicas of old topology, as 
electorate may not be reachable
+            contact(tracker.topologies().nodes(), topologies, this);
+        }
+
+        @Override
+        public void onFailure(Id from, Throwable failure)
+        {
+            synchronized (AbstractCoordinatePreAccept.this)
+            {
+                if (!extraRoundIsDone && tracker.recordFailure(from) == Failed)
+                    setFailure(failure);
+            }
+        }
+
+        @Override
+        public void onCallbackFailure(Id from, Throwable failure)
+        {
+            AbstractCoordinatePreAccept.this.onCallbackFailure(from, failure);
+        }
+
+        @Override
+        public void onSuccess(Id from, R reply)
+        {
+            synchronized (AbstractCoordinatePreAccept.this)
+            {
+                if (!extraRoundIsDone)
+                {
+                    if (!onExtraSuccessInternal(from, reply))
+                        setFailure(new Preempted(txnId, route.homeKey()));
+                    else if (tracker.recordSuccess(from) == Success)
+                        onPreAcceptedOrNewEpoch();
+                }
+            }
+        }
+    }
+
+    final Node node;
+    final TxnId txnId;
+    final Txn txn;
+    final FullRoute<?> route;
+
+    private Topologies topologies;
+    private boolean initialRoundIsDone;
+    private ExtraEpochs extraEpochs;
+    private Map<Id, Object> debug = Invariants.debug() ? new LinkedHashMap<>() 
: null;
+
+    AbstractCoordinatePreAccept(Node node, FullRoute<?> route, TxnId txnId, 
Txn txn)
+    {
+        this(node, txnId, txn, route, 
node.topology().withUnsyncedEpochs(route, txnId, txnId));
+    }
+
+    AbstractCoordinatePreAccept(Node node, TxnId txnId, Txn txn, FullRoute<?> 
route, Topologies topologies)
+    {
+        this.node = node;
+        this.txnId = txnId;
+        this.txn = txn;
+        this.route = route;
+        this.topologies = topologies;
+    }
+
+    final void start()
+    {
+        contact(topologies.nodes(), topologies, this);
+    }
+
+    abstract void contact(Set<Id> nodes, Topologies topologies, Callback<R> 
callback);
+    abstract void onSuccessInternal(Id from, R reply);
+    /**
+     * The tracker for the extra rounds only is provided by the 
AbstractCoordinatePreAccept, so we expect a boolean back
+     * indicating if the "success" reply was actually a good response or a 
failure (i.e. preempted)
+     */
+    abstract boolean onExtraSuccessInternal(Id from, R reply);
+    abstract void onFailureInternal(Id from, Throwable failure);
+    abstract void onNewEpochTopologyMismatch(TopologyMismatch mismatch);
+    abstract void onPreAccepted(Topologies topologies);
+    abstract long executeAtEpoch();
+
+    @Override
+    public synchronized final void onFailure(Id from, Throwable failure)
+    {
+        if (debug != null) debug.putIfAbsent(from, failure);
+        if (!initialRoundIsDone)
+            onFailureInternal(from, failure);
+    }
+
+    @Override
+    public final synchronized void onCallbackFailure(Id from, Throwable 
failure)
+    {
+        initialRoundIsDone = true;
+        if (extraEpochs != null)
+            extraEpochs.extraRoundIsDone = true;
+
+        tryFailure(failure);
+    }
+
+    @Override
+    public final synchronized void onSuccess(Id from, R reply)
+    {
+        if (debug != null) debug.putIfAbsent(from, reply);
+        if (!initialRoundIsDone)
+            onSuccessInternal(from, reply);
+    }
+
+    @Override
+    public final void setFailure(Throwable failure)
+    {
+        Invariants.checkState(!initialRoundIsDone || (extraEpochs != null && 
!extraEpochs.extraRoundIsDone));
+        initialRoundIsDone = true;
+        if (extraEpochs != null)
+            extraEpochs.extraRoundIsDone = true;
+
+        if (failure instanceof CoordinationFailed)
+        {
+            ((CoordinationFailed) failure).set(txnId, route.homeKey());
+            if (failure instanceof Timeout)
+                node.agent().metricsEventsListener().onTimeout(txnId);
+            else if (failure instanceof Preempted)
+                node.agent().metricsEventsListener().onPreempted(txnId);
+            else if (failure instanceof Invalidated)
+                node.agent().metricsEventsListener().onInvalidated(txnId);
+        }
+        super.setFailure(failure);
+    }
+
+    final void onPreAcceptedOrNewEpoch()
+    {
+        Invariants.checkState(!initialRoundIsDone || (extraEpochs != null && 
!extraEpochs.extraRoundIsDone));
+        initialRoundIsDone = true;
+        if (extraEpochs != null)
+            extraEpochs.extraRoundIsDone = true;
+
+        // if the epoch we are accepting in is later, we *must* contact the 
later epoch for pre-accept, as this epoch
+        // could have moved ahead, and the timestamp we may propose may be 
stale.
+        // Note that these future epochs are non-voting, they only serve to 
inform the timestamp we decide
+        long latestEpoch = executeAtEpoch();
+        if (latestEpoch <= topologies.currentEpoch())
+            onPreAccepted(topologies);
+        else
+            onNewEpoch(topologies, latestEpoch);
+    }
+
+    final void onNewEpoch(Topologies prevTopologies, long latestEpoch)
+    {
+        // TODO (desired, efficiency): check if we have already have a valid 
quorum for the future epoch
+        //  (noting that nodes may have adopted new ranges, in which case they 
should be discounted, and quorums may have changed shape)
+        node.withEpoch(latestEpoch, () -> {
+            TopologyMismatch mismatch = 
TopologyMismatch.checkForMismatch(node.topology().globalForEpoch(latestEpoch), 
txnId, route.homeKey(), txn.keys());
+            if (mismatch != null)
+            {
+                initialRoundIsDone = true;
+                onNewEpochTopologyMismatch(mismatch);
+                return;
+            }
+            topologies = node.topology().withUnsyncedEpochs(route, 
txnId.epoch(), latestEpoch);
+            boolean equivalent = topologies.oldestEpoch() <= 
prevTopologies.currentEpoch();
+            for (long epoch = topologies.currentEpoch() ; equivalent && epoch 
> prevTopologies.currentEpoch() ; --epoch)
+                equivalent = 
topologies.forEpoch(epoch).shards().equals(prevTopologies.current().shards());
+
+            if (equivalent)
+            {
+                onPreAccepted(topologies);
+            }
+            else
+            {
+                extraEpochs = new ExtraEpochs(prevTopologies.currentEpoch() + 
1, latestEpoch);

Review Comment:
   Just so I understand, we might create multiple `ExtraEpochs` if the 
`executeAt` keeps moving forward each time we attempt `PreAccept`?



##########
accord-core/src/main/java/accord/coordinate/CoordinatePreAccept.java:
##########
@@ -210,92 +115,32 @@ public synchronized void onSuccess(Id from, 
PreAcceptReply reply)
     }
 
     @Override
-    public void setFailure(Throwable failure)
+    boolean onExtraSuccessInternal(Id from, PreAcceptReply reply)
     {
-        Invariants.checkState(!initialPreAcceptIsDone || (extraPreAccept != 
null && !extraPreAccept.extraPreAcceptIsDone));
-        initialPreAcceptIsDone = true;
-        if (extraPreAccept != null)
-            extraPreAccept.extraPreAcceptIsDone = true;
-        if (failure instanceof CoordinationFailed)
-        {
-            ((CoordinationFailed) failure).set(txnId, route.homeKey());
-            if (failure instanceof Timeout)
-                node.agent().metricsEventsListener().onTimeout(txnId);
-            else if (failure instanceof Preempted)
-                node.agent().metricsEventsListener().onPreempted(txnId);
-            else if (failure instanceof Invalidated)
-                node.agent().metricsEventsListener().onInvalidated(txnId);
-        }
-        super.setFailure(failure);
-    }
-
-    void onPreAcceptedOrNewEpoch()
-    {
-        Invariants.checkState(!initialPreAcceptIsDone || (extraPreAccept != 
null && !extraPreAccept.extraPreAcceptIsDone));
-        initialPreAcceptIsDone = true;
-        if (extraPreAccept != null)
-            extraPreAccept.extraPreAcceptIsDone = true;
+        if (!reply.isOk())
+            return false;
 
-        // if the epoch we are accepting in is later, we *must* contact the 
later epoch for pre-accept, as this epoch
-        // could have moved ahead, and the timestamp we may propose may be 
stale.
-        // Note that these future epochs are non-voting, they only serve to 
inform the timestamp we decide
-        Timestamp executeAt = foldl(successes, (ok, prev) -> 
mergeMax(ok.witnessedAt, prev), Timestamp.NONE);
-        if (executeAt.epoch() <= topologies.currentEpoch())
-            onPreAccepted(topologies, executeAt, successes);
-        else
-            onNewEpoch(topologies, executeAt, successes);
+        PreAcceptOk ok = (PreAcceptOk) reply;
+        oks.add(ok);
+        return true;
     }
 
-    void onNewEpoch(Topologies prevTopologies, Timestamp executeAt, 
List<PreAcceptOk> successes)
+    @Override
+    void onNewEpochTopologyMismatch(TopologyMismatch mismatch)
     {
-        // TODO (desired, efficiency): check if we have already have a valid 
quorum for the future epoch
-        //  (noting that nodes may have adopted new ranges, in which case they 
should be discounted, and quorums may have changed shape)
-        node.withEpoch(executeAt.epoch(), () -> {
-            TopologyMismatch mismatch = 
TopologyMismatch.checkForMismatch(node.topology().globalForEpoch(executeAt.epoch()),
 txnId, route.homeKey(), txn.keys());
-            if (mismatch != null)
-            {
-                initialPreAcceptIsDone = true;
-                Propose.Invalidate.proposeInvalidate(node, new 
Ballot(node.uniqueNow()), txnId, route.someParticipatingKey(), (outcome, 
failure) -> {
-                    if (failure != null)
-                        mismatch.addSuppressed(failure);
-                    accept(null, mismatch);
-                });
-                return;
-            }
-            topologies = node.topology().withUnsyncedEpochs(route, 
txnId.epoch(), executeAt.epoch());
-            boolean equivalent = topologies.oldestEpoch() <= 
prevTopologies.currentEpoch();
-            for (long epoch = topologies.currentEpoch() ; equivalent && epoch 
> prevTopologies.currentEpoch() ; --epoch)
-                equivalent = 
topologies.forEpoch(epoch).shards().equals(prevTopologies.current().shards());
-
-            if (equivalent)
-            {
-                onPreAccepted(topologies, executeAt, successes);
-            }
-            else
-            {
-                extraPreAccept = new 
ExtraPreAccept(prevTopologies.currentEpoch() + 1, executeAt.epoch());
-                extraPreAccept.start();
-            }
+        Propose.Invalidate.proposeInvalidate(node, new 
Ballot(node.uniqueNow()), txnId, route.someParticipatingKey(), (outcome, 
failure) -> {

Review Comment:
   Really needs a comment explaining why in this mismatch case we invalidate, 
and it looks like we return an error the client?



##########
accord-core/src/main/java/accord/coordinate/CoordinateEphemeralRead.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import accord.api.Result;
+import accord.coordinate.tracking.QuorumTracker;
+import accord.local.CommandStore;
+import accord.local.Node;
+import accord.messages.Callback;
+import accord.messages.GetEphemeralReadDeps;
+import accord.messages.GetEphemeralReadDeps.GetEphemeralReadDepsOk;
+import accord.primitives.Deps;
+import accord.primitives.FullRoute;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
+import accord.topology.Topologies;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
+
+import static accord.coordinate.tracking.RequestStatus.Failed;
+import static accord.coordinate.tracking.RequestStatus.Success;
+
+public class CoordinateEphemeralRead extends 
AbstractCoordinatePreAccept<Result, GetEphemeralReadDepsOk>
+{
+    public static AsyncResult<Result> coordinate(Node node, FullRoute<?> 
route, TxnId txnId, Txn txn)
+    {
+        TopologyMismatch mismatch = 
TopologyMismatch.checkForMismatch(node.topology().globalForEpoch(txnId.epoch()),
 txnId, route.homeKey(), txn.keys());
+        if (mismatch != null)
+            return AsyncResults.failure(mismatch);
+
+        Topologies topologies = node.topology().withUnsyncedEpochs(route, 
txnId, txnId);
+        CoordinateEphemeralRead coordinate = new CoordinateEphemeralRead(node, 
topologies, route, txnId, txn);
+        coordinate.start();
+        return coordinate;
+    }
+
+    private final QuorumTracker tracker;
+    private final List<GetEphemeralReadDepsOk> oks;
+    private long executeAtEpoch;
+
+    CoordinateEphemeralRead(Node node, Topologies topologies, FullRoute<?> 
route, TxnId txnId, Txn txn)
+    {
+        super(node, route, txnId, txn);
+        this.tracker = new QuorumTracker(topologies);
+        this.executeAtEpoch = txnId.epoch();
+        this.oks = new ArrayList<>(topologies.estimateUniqueNodes());
+    }
+
+    @Override
+    void contact(Set<Node.Id> nodes, Topologies topologies, 
Callback<GetEphemeralReadDepsOk> callback)
+    {
+        CommandStore commandStore = CommandStore.maybeCurrent();
+        if (commandStore == null) commandStore = 
node.commandStores().select(route.homeKey());
+        node.send(nodes, to -> new GetEphemeralReadDeps(to, topologies, route, 
txnId, txn.keys(), executeAtEpoch), commandStore, callback);
+    }
+
+    @Override
+    long executeAtEpoch()
+    {
+        return executeAtEpoch;
+    }
+
+    @Override
+    public void onSuccessInternal(Node.Id from, GetEphemeralReadDepsOk ok)
+    {
+        oks.add(ok);
+        if (ok.latestEpoch > executeAtEpoch)
+            executeAtEpoch = ok.latestEpoch;
+
+        if (tracker.recordSuccess(from) == Success)
+            onPreAcceptedOrNewEpoch();
+    }
+
+    @Override
+    boolean onExtraSuccessInternal(Node.Id from, GetEphemeralReadDepsOk ok)
+    {
+        if (ok.latestEpoch > executeAtEpoch)
+            executeAtEpoch = ok.latestEpoch;
+
+        oks.add(ok);
+        return true;
+    }
+
+    @Override
+    public void onFailureInternal(Node.Id from, Throwable failure)
+    {
+        if (tracker.recordFailure(from) == Failed)
+            setFailure(new Timeout(txnId, route.homeKey()));
+    }
+
+    @Override
+    void onNewEpochTopologyMismatch(TopologyMismatch mismatch)
+    {
+        accept(null, mismatch);
+    }
+
+    @Override
+    void onPreAccepted(Topologies topologies)
+    {
+        Deps deps = Deps.merge(oks, ok -> ok.deps);
+        new ExecuteEphemeralRead(node, topologies, route, txnId, txn, 
executeAtEpoch, deps, this).start();

Review Comment:
   Can we bump the Epoch in the `txnid` for the read to the `executeAtEpoch` so 
that when the read happens it can check if the epoch has a migration happening?
   
   I think that would address the issue of doing a paxos write with async 
commit, then doing an Accord read at a different node that doesn't know about 
migrating to Paxos and having Accord accept the read. With the later Epoch 
Cassandra will reject the read in `TxnQuery`.
   
   I think this gives you read your own writes in this base case. There might 
be more compleax cases where it still doesn't hold because the read can still 
occur in a later epoch, but it at least puts in a floor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to