bdeggleston commented on code in PR #5:
URL: https://github.com/apache/cassandra-accord/pull/5#discussion_r927973637


##########
accord-core/src/main/java/accord/coordinate/Coordinate.java:
##########
@@ -1,46 +1,283 @@
 package accord.coordinate;
 
-import accord.api.ConfigurationService;
-
-import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.BiConsumer;
 
 import accord.api.Key;
-import accord.local.Node;
 import accord.api.Result;
-import accord.local.Node;
+import accord.coordinate.tracking.FastPathTracker;
+import accord.topology.Shard;
+import accord.topology.Topologies;
 import accord.txn.Ballot;
+import accord.messages.Callback;
+import accord.local.Node;
+import accord.txn.Dependencies;
+import accord.local.Node.Id;
+import accord.txn.Timestamp;
+import accord.messages.PreAccept;
+import accord.messages.PreAccept.PreAcceptOk;
 import accord.txn.Txn;
 import accord.txn.TxnId;
+import accord.messages.PreAccept.PreAcceptReply;
+import com.google.common.collect.Sets;
+
+import org.apache.cassandra.utils.concurrent.AsyncFuture;
 import org.apache.cassandra.utils.concurrent.Future;
 
-public class Coordinate
+/**
+ * Perform initial rounds of PreAccept and Accept until we have reached 
agreement about when we should execute.
+ * If we are preempted by a recovery coordinator, we abort and let them 
complete (and notify us about the execution result)
+ */
+public class Coordinate extends AsyncFuture<Result> implements 
Callback<PreAcceptReply>, BiConsumer<Result, Throwable>

Review Comment:
   WDYT about pulling the preaccept phase into its own class and composing 
preaccept and other phases in the coordinate class? On one hand, the top level 
operations, coordinate, recover, etc each have a unique first stage, so I 
understand the reasoning for bundling them into their respective classes. OTOH, 
each operation is a composition of several phases, and it would make more sense 
to me if they were modeled that way.



##########
accord-core/src/main/java/accord/impl/SimpleProgressLog.java:
##########
@@ -129,6 +131,7 @@ boolean isAtLeast(GlobalStatus equalOrGreaterThan)
         Set<Id> globalNotPersisted;
         GlobalPendingDurable globalPendingDurable;
 
+        Object debugInvestigating;

Review Comment:
   I'm assuming this can be removed?



##########
accord-core/src/test/java/accord/burn/BurnTest.java:
##########
@@ -229,6 +233,7 @@ static void burn(Random random, TopologyFactory 
topologyFactory, List<Id> client
     public static void main(String[] args) throws Exception
     {
         Long overrideSeed = null;
+//        Long overrideSeed = -7320078316311161123L;

Review Comment:
   leftover seed



##########
accord-core/src/main/java/accord/coordinate/Coordinate.java:
##########
@@ -1,46 +1,283 @@
 package accord.coordinate;
 
-import accord.api.ConfigurationService;
-
-import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.BiConsumer;
 
 import accord.api.Key;
-import accord.local.Node;
 import accord.api.Result;
-import accord.local.Node;
+import accord.coordinate.tracking.FastPathTracker;
+import accord.topology.Shard;
+import accord.topology.Topologies;
 import accord.txn.Ballot;
+import accord.messages.Callback;
+import accord.local.Node;
+import accord.txn.Dependencies;
+import accord.local.Node.Id;
+import accord.txn.Timestamp;
+import accord.messages.PreAccept;
+import accord.messages.PreAccept.PreAcceptOk;
 import accord.txn.Txn;
 import accord.txn.TxnId;
+import accord.messages.PreAccept.PreAcceptReply;
+import com.google.common.collect.Sets;
+
+import org.apache.cassandra.utils.concurrent.AsyncFuture;
 import org.apache.cassandra.utils.concurrent.Future;
 
-public class Coordinate
+/**
+ * Perform initial rounds of PreAccept and Accept until we have reached 
agreement about when we should execute.
+ * If we are preempted by a recovery coordinator, we abort and let them 
complete (and notify us about the execution result)
+ */
+public class Coordinate extends AsyncFuture<Result> implements 
Callback<PreAcceptReply>, BiConsumer<Result, Throwable>
 {
-    private static Future<Result> fetchEpochOrExecute(Node node, Agreed agreed)
+    static class ShardTracker extends FastPathTracker.FastPathShardTracker
+    {
+        public ShardTracker(Shard shard)
+        {
+            super(shard);
+        }
+
+        @Override
+        public boolean includeInFastPath(Node.Id node, boolean 
withFastPathTimestamp)
+        {
+            return withFastPathTimestamp && 
shard.fastPathElectorate.contains(node);
+        }
+
+        @Override
+        public boolean hasMetFastPathCriteria()
+        {
+            return fastPathAccepts >= shard.fastPathQuorumSize;
+        }
+    }
+
+    static class PreacceptTracker extends FastPathTracker<ShardTracker>
+    {
+        volatile long supersedingEpoch = -1;
+        private final boolean fastPathPermitted;
+        private final Set<Id> successes = new HashSet<>();
+        private Set<Id> failures;
+
+        public PreacceptTracker(Topologies topologies, boolean 
fastPathPermitted)
+        {
+            super(topologies, Coordinate.ShardTracker[]::new, 
Coordinate.ShardTracker::new);
+            this.fastPathPermitted = fastPathPermitted;
+        }
+
+        public PreacceptTracker(Topologies topologies)
+        {
+            this(topologies, topologies.fastPathPermitted());
+        }
+
+        @Override
+        public boolean failure(Id node)
+        {
+            if (failures == null)
+                failures = new HashSet<>();
+            failures.add(node);
+            return super.failure(node);
+        }
+
+        @Override
+        public void recordSuccess(Id node, boolean withFastPathTimestamp)
+        {
+            successes.add(node);
+            super.recordSuccess(node, withFastPathTimestamp);
+        }
+
+        public void recordSuccess(Id node)
+        {
+            recordSuccess(node, false);
+        }
+
+        public synchronized boolean recordSupersedingEpoch(long epoch)
+        {
+            if (epoch <= supersedingEpoch)
+                return false;
+            supersedingEpoch = epoch;
+            return true;
+        }
+
+        public boolean hasSupersedingEpoch()
+        {
+            return supersedingEpoch > 0;
+        }
+
+        public PreacceptTracker withUpdatedTopologies(Topologies topologies)
+        {
+            PreacceptTracker tracker = new PreacceptTracker(topologies, false);
+            successes.forEach(tracker::recordSuccess);
+            if (failures != null)
+                failures.forEach(tracker::failure);
+            return tracker;
+        }
+
+        @Override
+        public boolean hasMetFastPathCriteria()
+        {
+            return fastPathPermitted && super.hasMetFastPathCriteria();
+        }
+
+        boolean shouldSlowPathAccept()
+        {
+            return (!fastPathPermitted || !hasInFlight()) && 
hasReachedQuorum();
+        }
+    }
+
+    final Node node;
+    final TxnId txnId;
+    final Txn txn;
+    final Key homeKey;
+
+    private PreacceptTracker tracker;
+    private final List<PreAcceptOk> preAcceptOks = new ArrayList<>();
+    private boolean isPreAccepted;

Review Comment:
   If we decide to not separate preaccept from coordinate: `isDone` might be 
too generic for the Coordinate class, but `isPreaccepted` implies (to me) that 
preaccept was successful, and is odd to see set to true in `onFailure`. Maybe 
`preacceptDone` or something similar?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to