dcapwell commented on code in PR #141:
URL: https://github.com/apache/cassandra-accord/pull/141#discussion_r1851043178


##########
accord-core/src/main/java/accord/messages/InformDurable.java:
##########
@@ -63,6 +67,22 @@ private InformDurable(TxnId txnId, Route<?> scope, long 
waitForEpoch, @Nullable
         this.durability = durability;
     }
 
+    public static void informHome(Node node, Topologies any, TxnId txnId, 
Route<?> route, Timestamp executeAt)
+    {
+        long homeEpoch = txnId.epoch();
+        Topology homeEpochTopology = any.forEpoch(homeEpoch);
+        int homeShardIndex = homeEpochTopology.indexForKey(route.homeKey());
+        if (homeShardIndex < 0)
+        {
+            homeEpochTopology = node.topology().globalForEpoch(homeEpoch);
+            homeShardIndex = homeEpochTopology.indexForKey(route.homeKey());
+        }
+
+        Shard homeShard = homeEpochTopology.get(homeShardIndex);
+        Topologies homeTopology = new Topologies.Single(any, new 
Topology(homeEpoch, homeShard));
+        node.send(homeShard.nodes, to -> new InformDurable(to, homeTopology, 
route.homeKeyOnlyRoute(), txnId, executeAt, Majority));

Review Comment:
   should `Majority` be hard coded or should it be passed as a param?  What 
about changing the method name to `informHomeMajority`



##########
accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java:
##########
@@ -310,14 +302,6 @@ private static class AsyncInclusiveSyncPointAdapter<U 
extends Unseekable> extend
             protected AsyncInclusiveSyncPointAdapter() {
                 super();
             }
-
-            @Override
-            protected void addOrExecuteCallback(ExecuteBlocking<U> execute, 
BiConsumer<? super SyncPoint<U>, Throwable> callback)
-            {
-                // If this is the async adapter then we want to invoke the 
callback immediately

Review Comment:
   what is the reason to make this async rather than immediate?



##########
accord-core/src/main/java/accord/messages/InformDurable.java:
##########
@@ -63,6 +67,22 @@ private InformDurable(TxnId txnId, Route<?> scope, long 
waitForEpoch, @Nullable
         this.durability = durability;
     }
 
+    public static void informHome(Node node, Topologies any, TxnId txnId, 
Route<?> route, Timestamp executeAt)
+    {
+        long homeEpoch = txnId.epoch();
+        Topology homeEpochTopology = any.forEpoch(homeEpoch);
+        int homeShardIndex = homeEpochTopology.indexForKey(route.homeKey());
+        if (homeShardIndex < 0)
+        {
+            homeEpochTopology = node.topology().globalForEpoch(homeEpoch);
+            homeShardIndex = homeEpochTopology.indexForKey(route.homeKey());
+        }
+
+        Shard homeShard = homeEpochTopology.get(homeShardIndex);
+        Topologies homeTopology = new Topologies.Single(any, new 
Topology(homeEpoch, homeShard));
+        node.send(homeShard.nodes, to -> new InformDurable(to, homeTopology, 
route.homeKeyOnlyRoute(), txnId, executeAt, Majority));

Review Comment:
   in `MaybeRecover` we call `accord.primitives.Status.Durability#isDurable` 
which is
   
   ```
   public boolean isDurable()
           {
               return this == Majority || this == Universal;
           }
   ```
   
   so based off what we know, it "could" be `Universal` as well?



##########
accord-core/src/main/java/accord/messages/ReadData.java:
##########
@@ -513,10 +513,17 @@ public enum CommitOrReadNack implements ReadData.ReadReply
          * The commit has been rejected due to stale ballot.
          */
         Rejected("CommitRejected"),
+
         /**
          * Either not committed, or not stable
          */
         Insufficient("CommitInsufficient"),
+
+        /**
+         * PreApplied successfully, but the request is blocking so waiting to 
reply
+         */
+        Waiting("ApplyWaiting"),

Review Comment:
   I don't see this sent in this patch, is this done in C*?



##########
accord-core/src/main/java/accord/coordinate/MaybeRecover.java:
##########
@@ -108,7 +108,7 @@ protected void onDone(Success success, Throwable fail)
                     if (hasMadeProgress(full))
                     {
                         if (full.durability.isDurable())
-                            
node.send(topologies.forEpoch(txnId.epoch()).forKey(route.homeKey()).nodes, to 
-> new InformDurable(to, topologies, route, txnId, full.executeAtIfKnown(), 
full.durability));
+                            InformDurable.informHome(node, topologies, txnId, 
route, full.executeAtIfKnown());

Review Comment:
   commented in `InformDurable.informHome` but durability "could" be 
`Universal` yet this method hard codes `Majority`



##########
accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java:
##########
@@ -219,18 +224,17 @@ public void stabilise(Node node, Topologies any, 
FullRoute<?> route, Ballot ball
             @Override
             public void execute(Node node, Topologies any, FullRoute<?> route, 
ExecutePath path, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, 
BiConsumer<? super SyncPoint<U>, Throwable> callback)
             {
-                Topologies all = forExecution(node, route, txnId, executeAt, 
deps);
-                persist(node, all, route, txnId, txn, executeAt, deps, 
txn.execute(txnId, executeAt, null), txn.result(txnId, executeAt, null), 
callback);
+                persist(node, null, route, txnId, txn, executeAt, deps, 
txn.execute(txnId, executeAt, null), txn.result(txnId, executeAt, null), 
callback);

Review Comment:
   just double checking, this is because every impl recalculates `all`, so just 
avoid double effort?



##########
accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java:
##########
@@ -219,18 +224,17 @@ public void stabilise(Node node, Topologies any, 
FullRoute<?> route, Ballot ball
             @Override
             public void execute(Node node, Topologies any, FullRoute<?> route, 
ExecutePath path, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, 
BiConsumer<? super SyncPoint<U>, Throwable> callback)
             {
-                Topologies all = forExecution(node, route, txnId, executeAt, 
deps);
-                persist(node, all, route, txnId, txn, executeAt, deps, 
txn.execute(txnId, executeAt, null), txn.result(txnId, executeAt, null), 
callback);
+                persist(node, null, route, txnId, txn, executeAt, deps, 
txn.execute(txnId, executeAt, null), txn.result(txnId, executeAt, null), 
callback);
             }
 
             @Override
-            public void persist(Node node, Topologies any, FullRoute<?> route, 
Participants<?> participants, TxnId txnId, Txn txn, Timestamp executeAt, Deps 
deps, Writes writes, Result result, BiConsumer<? super SyncPoint<U>, Throwable> 
callback)
+            public void persist(Node node, Topologies ignore, FullRoute<?> 
route, Participants<?> participants, TxnId txnId, Txn txn, Timestamp executeAt, 
Deps deps, Writes writes, Result result, BiConsumer<? super SyncPoint<U>, 
Throwable> callback)
             {
                 Topologies all = forExecution(node, route, txnId, executeAt, 
deps);
 
                 invokeSuccess(node, route, txnId, txn, deps, callback);
                 new PersistSyncPoint(node, all, txnId, route, txn, executeAt, 
deps, writes, result)
-                    .start(Apply.FACTORY, Maximal, any, writes, result);
+                .start(Apply.FACTORY, Maximal, all, writes, result);

Review Comment:
   for me: lgtm



##########
accord-core/src/main/java/accord/coordinate/Persist.java:
##########
@@ -86,12 +84,7 @@ public void onSuccess(Id from, ApplyReply reply)
                     if (!isDone)
                     {
                         isDone = true;
-                        Topologies topologies = tracker.topologies();
-                        Topology topology = topologies.forEpoch(txnId.epoch());
-                        int homeShardIndex = 
topology.indexForKey(route.homeKey());
-                        // we can persist only partially if some shards are 
already completed; in this case the home shard may not participate
-                        if (homeShardIndex >= 0)
-                            node.send(topology.get(homeShardIndex).nodes, to 
-> new InformDurable(to, topologies, route, txnId, executeAt, Majority));
+                        InformDurable.informHome(node, topologies, txnId, 
route, executeAt);

Review Comment:
   we will now send as we fall back to the global topology yet the previous 
comment implies this isn't safe?



##########
accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java:
##########
@@ -195,6 +257,11 @@ protected void onSuccess()
         trySuccess(syncPoint);
     }
 
+    protected void sendApply(Node.Id to)

Review Comment:
   this appears to be dead code, is this used by C*? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to