belliottsmith commented on code in PR #7:
URL: https://github.com/apache/cassandra-accord/pull/7#discussion_r1014295387


##########
accord-core/src/main/java/accord/messages/TxnRequest.java:
##########
@@ -18,108 +18,105 @@
 
 package accord.messages;
 
-import com.google.common.annotations.VisibleForTesting;
+import java.util.function.BiFunction;
+
+import accord.local.SafeCommandStore;
+import accord.utils.MapReduceConsume;
 import com.google.common.base.Preconditions;
 
-import accord.api.Key;
+import accord.api.RoutingKey;
 import accord.local.Node;
 import accord.local.Node.Id;
+import accord.primitives.AbstractKeys;
+import accord.primitives.AbstractRoute;
 import accord.primitives.KeyRanges;
 import accord.local.PreLoadContext;
+import accord.primitives.PartialRoute;
+import accord.primitives.Route;
 import accord.topology.Topologies;
 import accord.topology.Topology;
-import accord.primitives.Keys;
 import accord.primitives.TxnId;
 
 import static java.lang.Long.min;
 
-public abstract class TxnRequest implements EpochRequest, PreLoadContext
+public abstract class TxnRequest<R> implements EpochRequest, PreLoadContext, 
MapReduceConsume<SafeCommandStore, R>
 {
-    public static abstract class WithUnsynced extends TxnRequest
+    public static abstract class WithUnsynced<R> extends TxnRequest<R>
     {
-        public final TxnId txnId;
-        public final long minEpoch;
-        protected final boolean doNotComputeProgressKey;
+        public final long minEpoch; // TODO: can this just always be 
TxnId.epoch?
+        public final boolean doNotComputeProgressKey;
 
-        public WithUnsynced(Id to, Topologies topologies, Keys keys, TxnId 
txnId)
+        protected WithUnsynced(Id to, Topologies topologies, TxnId txnId, 
Route route)
         {
-            this(to, topologies, keys, txnId, latestRelevantEpochIndex(to, 
topologies, keys));
+            this(to, topologies, txnId, route, latestRelevantEpochIndex(to, 
topologies, route));
         }
 
-        private WithUnsynced(Id to, Topologies topologies, Keys keys, TxnId 
txnId, int startIndex)
+        protected WithUnsynced(Id to, Topologies topologies, TxnId txnId, 
Route route, int startIndex)
         {
-            super(to, topologies, keys, startIndex);
-            this.txnId = txnId;
+            super(to, topologies, route, txnId, startIndex);
             this.minEpoch = topologies.oldestEpoch();
-            // to understand this calculation we must bear in mind the 
following:
-            //  - startIndex is the "latest relevant" which means we skip over 
recent epochs where we are not owners at all,
-            //    i.e. if this node does not participate in the most recent 
epoch, startIndex > 0
-            //  - waitForEpoch gives us the most recent epoch with differing 
ownership information, starting from startIndex
-            // So, we can have some surprising situations arise where a 
*prior* owner must be contacted for its vote,
-            // and does not need to wait for the latest ring information 
because from the point of view of its contribution
-            // the stale ring information is sufficient, however we do not 
want it to compute a progress key with this stale
-            // ring information and mistakenly believe that it is a home shard 
for the transaction, as it will not receive
-            // updates for the transaction going forward.
-            // So in these cases we send a special flag indicating that the 
progress key should not be computed
-            // (as it might be done so with stale ring information)
-            this.doNotComputeProgressKey = waitForEpoch() < txnId.epoch && 
startIndex > 0
-                                           && 
topologies.get(startIndex).epoch() < txnId.epoch;
+            this.doNotComputeProgressKey = doNotComputeProgressKey(topologies, 
startIndex, txnId, waitForEpoch());
 
+            // TODO (soon): alongside Invariants class, introduce PARANOID 
mode for checking extra invariants
             KeyRanges ranges = 
topologies.forEpoch(txnId.epoch).rangesForNode(to);
             if (doNotComputeProgressKey)
             {
-                Preconditions.checkState(ranges == null || 
!ranges.intersects(keys)); // confirm dest is not a replica on txnId.epoch
+                Preconditions.checkState(!ranges.intersects(route)); // 
confirm dest is not a replica on txnId.epoch
             }
             else
             {
-                boolean intersects = ranges != null && ranges.intersects(keys);
+                boolean intersects = ranges.intersects(route);
                 long progressEpoch = Math.min(waitForEpoch(), txnId.epoch);
                 KeyRanges computesRangesOn = 
topologies.forEpoch(progressEpoch).rangesForNode(to);
-                boolean check = computesRangesOn != null && 
computesRangesOn.intersects(keys);
+                boolean check = computesRangesOn != null && 
computesRangesOn.intersects(route);
                 if (check != intersects)
                     throw new IllegalStateException();
             }
         }
 
-        Key progressKey(Node node, Key homeKey)
+        protected WithUnsynced(TxnId txnId, PartialRoute scope, long 
waitForEpoch, long minEpoch, boolean doNotComputeProgressKey)
         {
-            // if waitForEpoch < txnId.epoch, then this replica's ownership is 
unchanged
-            long progressEpoch = min(waitForEpoch(), txnId.epoch);
-            return doNotComputeProgressKey ? null : 
node.trySelectProgressKey(progressEpoch, scope(), homeKey);
+            super(txnId, scope, waitForEpoch);
+            this.minEpoch = minEpoch;
+            this.doNotComputeProgressKey = doNotComputeProgressKey;
         }
 
-        @VisibleForTesting
-        public WithUnsynced(Keys scope, long epoch, TxnId txnId)
+        @Override
+        RoutingKey progressKey(Node node)
         {
-            super(scope, epoch);
-            this.txnId = txnId;
-            this.minEpoch = epoch;
-            this.doNotComputeProgressKey = false;
+            if (doNotComputeProgressKey)
+                return null;
+            return super.progressKey(node);
         }
     }
 
-    private final Keys scope;
-    private final long waitForEpoch;
+    public final TxnId txnId;
+    public final PartialRoute scope;
+    public final long waitForEpoch;
+    protected transient RoutingKey progressKey;
+    protected transient Node node;
+    protected transient Id replyTo;
+    protected transient ReplyContext replyContext;
 
-    public TxnRequest(Node.Id to, Topologies topologies, Keys keys)
+    protected TxnRequest(Id to, Topologies topologies, AbstractRoute route, 
TxnId txnId)
     {
-        this(to, topologies, keys, 0);
+        this(to, topologies, route, txnId, latestRelevantEpochIndex(to, 
topologies, route));
     }
 
-    public TxnRequest(Node.Id to, Topologies topologies, Keys keys, int 
startIndex)
+    protected TxnRequest(Id to, Topologies topologies, AbstractRoute route, 
TxnId txnId, int startIndex)
     {
-        this(computeScope(to, topologies, keys, startIndex),
-             computeWaitForEpoch(to, topologies, startIndex));
+        this(txnId, computeScope(to, topologies, route, startIndex), 
computeWaitForEpoch(to, topologies, startIndex));
     }
 
-    public TxnRequest(Keys scope, long waitForEpoch)
+    protected TxnRequest(TxnId txnId, PartialRoute scope, long waitForEpoch)
     {
         Preconditions.checkState(!scope.isEmpty());
+        this.txnId = txnId;
         this.scope = scope;
         this.waitForEpoch = waitForEpoch;
     }
 
-    public Keys scope()
+    public PartialRoute scope()

Review Comment:
   Have provided a comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to