belliottsmith commented on code in PR #7:
URL: https://github.com/apache/cassandra-accord/pull/7#discussion_r1012120628


##########
accord-core/src/main/java/accord/impl/SimpleProgressLog.java:
##########
@@ -438,77 +438,94 @@ void update(Node node, TxnId txnId, Command command)
                 return;
             }
 
+            if (command.hasBeen(blockedUntil))
+            {
+                if (command.hasBeen(ReplicationPhase.Apply)) recordApply();
+                else if (command.hasBeen(ReplicationPhase.Commit)) 
recordCommit();
+                return;
+            }
+
             progress = Investigating;
-            // check status with the only keys we know, if any, then:
-            // 1. if we cannot find any primary record of the transaction, 
then it cannot be a dependency so record this fact
-            // 2. otherwise record the homeKey for future reference and set 
the status based on whether progress has been made
-            long onEpoch = (command.hasBeen(Status.Committed) ? 
command.executeAt() : txnId).epoch;
-            node.withEpoch(onEpoch, () -> {
-                Key someKey; Keys someKeys; {
-                    Keys tmpKeys = Keys.union(this.someKeys, 
command.someKeys());
-                    someKey = command.homeKey() == null ? tmpKeys.get(0) : 
command.homeKey();
-                    someKeys = tmpKeys.with(someKey);
+            // first make sure we have enough information to obtain the 
command locally
+            boolean canExecute = command.hasBeen(Status.Committed);
+            long srcEpoch = (command.hasBeen(Status.Committed) ? 
command.executeAt() : txnId).epoch;
+            // TODO: compute fromEpoch, the epoch we already have this txn 
replicated until
+            long toEpoch = Math.max(srcEpoch, node.topology().epoch());
+            node.withEpoch(srcEpoch, () -> {
+
+                // first check we have enough routing information for the 
ranges we own; if not, fetch it
+                AbstractRoute route = route(command);
+                KeyRanges ranges = 
node.topology().localRangesForEpochs(txnId.epoch, toEpoch);
+                if (route == null || !route.covers(ranges))
+                {
+                    ReplicationPhase blockedOn = this.blockedUntil;
+                    BiConsumer<FindRoute.Result, Throwable> foundRoute = 
(findRoute, fail) -> {
+                        if (progress == Investigating && blockedOn == 
this.blockedUntil)
+                        {
+                            progress = Expected;
+                            if (findRoute != null && findRoute.route != null 
&& !(blockedOnKeys instanceof Route))
+                                blockedOnKeys = findRoute.route;
+                            if (findRoute == null && fail == null)
+                                invalidate(node, command);
+                        }
+                    };
+
+                    if (command.homeKey() != null || route != null)
+                    {
+                        RoutingKey homeKey = route != null ? route.homeKey : 
command.homeKey();
+                        debugInvestigating = FindRoute.findRoute(node, txnId, 
homeKey, foundRoute);
+                    }
+                    else
+                    {
+                        RoutingKeys someKeys = this.blockedOnKeys;
+                        if (someKeys == null || someKeys.isEmpty()) someKeys = 
route;

Review Comment:
   I actually meant to move this logic to `FetchData.fetchWithSomeKey` and 
`FetchData.fetchWithHomeKey` so I have finished this refactor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to