aweisberg commented on code in PR #80:
URL: https://github.com/apache/cassandra-accord/pull/80#discussion_r1500854458


##########
accord-core/src/main/java/accord/coordinate/CoordinatePreAccept.java:
##########
@@ -210,92 +115,32 @@ public synchronized void onSuccess(Id from, 
PreAcceptReply reply)
     }
 
     @Override
-    public void setFailure(Throwable failure)
+    boolean onExtraSuccessInternal(Id from, PreAcceptReply reply)
     {
-        Invariants.checkState(!initialPreAcceptIsDone || (extraPreAccept != 
null && !extraPreAccept.extraPreAcceptIsDone));
-        initialPreAcceptIsDone = true;
-        if (extraPreAccept != null)
-            extraPreAccept.extraPreAcceptIsDone = true;
-        if (failure instanceof CoordinationFailed)
-        {
-            ((CoordinationFailed) failure).set(txnId, route.homeKey());
-            if (failure instanceof Timeout)
-                node.agent().metricsEventsListener().onTimeout(txnId);
-            else if (failure instanceof Preempted)
-                node.agent().metricsEventsListener().onPreempted(txnId);
-            else if (failure instanceof Invalidated)
-                node.agent().metricsEventsListener().onInvalidated(txnId);
-        }
-        super.setFailure(failure);
-    }
-
-    void onPreAcceptedOrNewEpoch()
-    {
-        Invariants.checkState(!initialPreAcceptIsDone || (extraPreAccept != 
null && !extraPreAccept.extraPreAcceptIsDone));
-        initialPreAcceptIsDone = true;
-        if (extraPreAccept != null)
-            extraPreAccept.extraPreAcceptIsDone = true;
+        if (!reply.isOk())
+            return false;
 
-        // if the epoch we are accepting in is later, we *must* contact the 
later epoch for pre-accept, as this epoch
-        // could have moved ahead, and the timestamp we may propose may be 
stale.
-        // Note that these future epochs are non-voting, they only serve to 
inform the timestamp we decide
-        Timestamp executeAt = foldl(successes, (ok, prev) -> 
mergeMax(ok.witnessedAt, prev), Timestamp.NONE);
-        if (executeAt.epoch() <= topologies.currentEpoch())
-            onPreAccepted(topologies, executeAt, successes);
-        else
-            onNewEpoch(topologies, executeAt, successes);
+        PreAcceptOk ok = (PreAcceptOk) reply;
+        oks.add(ok);
+        return true;
     }
 
-    void onNewEpoch(Topologies prevTopologies, Timestamp executeAt, 
List<PreAcceptOk> successes)
+    @Override
+    void onNewEpochTopologyMismatch(TopologyMismatch mismatch)
     {
-        // TODO (desired, efficiency): check if we have already have a valid 
quorum for the future epoch
-        //  (noting that nodes may have adopted new ranges, in which case they 
should be discounted, and quorums may have changed shape)
-        node.withEpoch(executeAt.epoch(), () -> {
-            TopologyMismatch mismatch = 
TopologyMismatch.checkForMismatch(node.topology().globalForEpoch(executeAt.epoch()),
 txnId, route.homeKey(), txn.keys());
-            if (mismatch != null)
-            {
-                initialPreAcceptIsDone = true;
-                Propose.Invalidate.proposeInvalidate(node, new 
Ballot(node.uniqueNow()), txnId, route.someParticipatingKey(), (outcome, 
failure) -> {
-                    if (failure != null)
-                        mismatch.addSuppressed(failure);
-                    accept(null, mismatch);
-                });
-                return;
-            }
-            topologies = node.topology().withUnsyncedEpochs(route, 
txnId.epoch(), executeAt.epoch());
-            boolean equivalent = topologies.oldestEpoch() <= 
prevTopologies.currentEpoch();
-            for (long epoch = topologies.currentEpoch() ; equivalent && epoch 
> prevTopologies.currentEpoch() ; --epoch)
-                equivalent = 
topologies.forEpoch(epoch).shards().equals(prevTopologies.current().shards());
-
-            if (equivalent)
-            {
-                onPreAccepted(topologies, executeAt, successes);
-            }
-            else
-            {
-                extraPreAccept = new 
ExtraPreAccept(prevTopologies.currentEpoch() + 1, executeAt.epoch());
-                extraPreAccept.start();
-            }
+        Propose.Invalidate.proposeInvalidate(node, new 
Ballot(node.uniqueNow()), txnId, route.someParticipatingKey(), (outcome, 
failure) -> {

Review Comment:
   Another question that comes to mind is that the actual execution isn't 
guaranteed to take place in the `executeAt` epoch so presumably when we read a 
given key or range there is validation that the node still actually has that 
data?
   
   Or are we relying on the node not having done any cleanup so it might have 
stale data, but that still meets the per key linearizability bar?
   
   This brings up back to cleanup and how it will integrate with Accord. Since 
ephemeral reads aren't visible to other transactions it's not like you can run 
a barrier to know what data is safe to drop?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to