belliottsmith commented on code in PR #7:
URL: https://github.com/apache/cassandra-accord/pull/7#discussion_r1011794157


##########
accord-core/src/main/java/accord/impl/SimpleProgressLog.java:
##########
@@ -668,242 +710,123 @@ State recordExecute(TxnId txnId)
         {
             State state = stateMap.get(txnId);
             if (state != null && state.blockingState != null)
-                state.blockingState.recordExecute();
+                state.blockingState.recordApply();
             return state;
         }
 
-        private void ensureSafeOrAtLeast(Command command, boolean 
isProgressShard, boolean isHomeShard, LocalStatus newStatus, Progress 
newProgress)
+        private void ensureSafeOrAtLeast(Command command, ProgressShard shard, 
CoordinateStatus newStatus, Progress newProgress)
         {
+            Preconditions.checkState(shard != Unsure);
+
             State state = null;
             assert newStatus.isAtMost(ReadyToExecute);
-            if (newStatus.isAtLeast(LocalStatus.Committed))
+            if (newStatus.isAtLeast(CoordinateStatus.Committed))
                 state = recordCommit(command.txnId());
 
-            if (isProgressShard)
+            if (shard.isProgress())
             {
                 state = ensure(command.txnId(), state);
 
-                if (isHomeShard) state.ensureAtLeast(command, newStatus, 
newProgress, node);
+                if (shard.isHome()) state.ensureAtLeast(command, newStatus, 
newProgress);
                 else ensure(command.txnId()).ensureAtLeast(Safe);
             }
         }
 
         @Override
-        public void accept(Command command, boolean isProgressShard, boolean 
isHomeShard)
+        public void accept(Command command, ProgressShard shard)
         {
-            ensureSafeOrAtLeast(command, isProgressShard, isHomeShard, 
Uncommitted, Expected);
+            ensureSafeOrAtLeast(command, shard, Uncommitted, Expected);
         }
 
         @Override
-        public void commit(Command command, boolean isProgressShard, boolean 
isHomeShard)
+        public void commit(Command command, ProgressShard shard)
         {
-            ensureSafeOrAtLeast(command, isProgressShard, isHomeShard, 
LocalStatus.Committed, NoneExpected);
+            ensureSafeOrAtLeast(command, shard, CoordinateStatus.Committed, 
NoneExpected);
         }
 
         @Override
-        public void readyToExecute(Command command, boolean isProgressShard, 
boolean isHomeShard)
+        public void readyToExecute(Command command, ProgressShard shard)
         {
-            ensureSafeOrAtLeast(command, isProgressShard, isHomeShard, 
LocalStatus.ReadyToExecute, Expected);
+            ensureSafeOrAtLeast(command, shard, 
CoordinateStatus.ReadyToExecute, Expected);
         }
 
         @Override
-        public void execute(Command command, boolean isProgressShard, boolean 
isHomeShard)
+        public void execute(Command command, ProgressShard shard)
         {
-            State state = recordExecute(command.txnId());
-
-            if (isProgressShard)
-            {
-                state = ensure(command.txnId(), state);
-
-                if (isHomeShard) state.home().executed(node, command);
-                else ensure(command.txnId()).ensureAtLeast(Safe);
-            }
+            recordExecute(command.txnId());
+            // this is the home shard's state ONLY, so we don't know it is 
fully durable locally
+            ensureSafeOrAtLeast(command, shard, 
CoordinateStatus.ReadyToExecute, Expected);
         }
 
         @Override
-        public void invalidate(Command command, boolean isProgressShard, 
boolean isHomeShard)
+        public void invalidate(Command command, ProgressShard shard)
         {
             State state = recordExecute(command.txnId());
 
-            if (isProgressShard)
+            Preconditions.checkState(shard == Home || state == null || 
state.coordinateState == null);
+
+            // note: we permit Unsure here, so we check if we have any local 
home state
+            if (shard.isProgress())
             {
                 state = ensure(command.txnId(), state);
 
-                if (isHomeShard) state.ensureAtLeast(command, 
LocalStatus.Done, Done, node);
+                if (shard.isHome()) state.ensureAtLeast(command, 
CoordinateStatus.Done, Done);
                 else ensure(command.txnId()).ensureAtLeast(Safe);
             }
         }
 
         @Override
-        public void executedOnAllShards(Command command, Set<Id> persistedOn)
-        {
-            State state = ensure(command.txnId());
-            state.home().executedOnAllShards(node, command, persistedOn);
-        }
-
-        @Override
-        public void waiting(PartialCommand blockedByCommand, Keys someKeys)
-        {
-            TxnId blockedBy = blockedByCommand.txnId();
-            if (!blockedByCommand.hasBeen(Executed))
-                ensure(blockedBy).recordBlocking(blockedByCommand, someKeys);
-        }
-    }
-
-    @Override
-    public void run()
-    {
-        for (Instance instance : instances)
-        {
-            // TODO: we want to be able to poll others about pending 
dependencies to check forward progress,
-            //       as we don't know all dependencies locally (or perhaps 
any, at execution time) so we may
-            //       begin expecting forward progress too early
-            // state map may be updated during iteration, so need to clone the 
values set
-            new ArrayList<>(instance.stateMap.values()).forEach(state -> 
state.update(node));
-        }
-    }
-
-    static class CoordinateApplyAndCheck extends AsyncFuture<Void> implements 
Callback<ApplyAndCheckOk>
-    {
-        final TxnId txnId;
-        final HomeState state;
-        final Set<Id> waitingOnResponses;
-
-        static Future<Void> applyAndCheck(Node node, TxnId txnId, Command 
command, HomeState state)
-        {
-            CoordinateApplyAndCheck coordinate = new 
CoordinateApplyAndCheck(txnId, state);
-            Topologies topologies = 
node.topology().preciseEpochs(command.txn(), command.executeAt().epoch);
-            state.globalNotPersisted.retainAll(topologies.nodes()); // we 
might have had some nodes from older shards that are now redundant
-            node.send(state.globalNotPersisted, id -> new ApplyAndCheck(id, 
topologies,
-                                                                        
command.txnId(), command.txn(), command.homeKey(),
-                                                                        
command.savedDeps(), command.executeAt(),
-                                                                        
command.writes(), command.result(),
-                                                                        
state.globalNotPersisted),
-                      coordinate);
-            return coordinate;
-        }
-
-        CoordinateApplyAndCheck(TxnId txnId, HomeState state)
+        public void durableLocal(TxnId txnId)
         {
-            this.txnId = txnId;
-            this.state = state;
-            this.waitingOnResponses = new HashSet<>(state.globalNotPersisted);
+            State state = ensure(txnId);
+            state.global().durableLocal(node);
         }
 
         @Override
-        public void onSuccess(Id from, ApplyAndCheckOk response)
+        public void durable(Command command, @Nullable Set<Id> persistedOn)

Review Comment:
   Maybe there's some improvements to make again to terminology, but durable is 
taken to mean reached our global durability requirements. Similarly, 
`Durability.Durable` refers to this global durability, but offers a lower 
`Local` level of _durability_.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to