belliottsmith commented on code in PR #7:
URL: https://github.com/apache/cassandra-accord/pull/7#discussion_r1011791935


##########
accord-core/src/main/java/accord/impl/SimpleProgressLog.java:
##########
@@ -668,242 +710,123 @@ State recordExecute(TxnId txnId)
         {
             State state = stateMap.get(txnId);
             if (state != null && state.blockingState != null)
-                state.blockingState.recordExecute();
+                state.blockingState.recordApply();
             return state;
         }
 
-        private void ensureSafeOrAtLeast(Command command, boolean 
isProgressShard, boolean isHomeShard, LocalStatus newStatus, Progress 
newProgress)
+        private void ensureSafeOrAtLeast(Command command, ProgressShard shard, 
CoordinateStatus newStatus, Progress newProgress)
         {
+            Preconditions.checkState(shard != Unsure);
+
             State state = null;
             assert newStatus.isAtMost(ReadyToExecute);
-            if (newStatus.isAtLeast(LocalStatus.Committed))
+            if (newStatus.isAtLeast(CoordinateStatus.Committed))
                 state = recordCommit(command.txnId());
 
-            if (isProgressShard)
+            if (shard.isProgress())
             {
                 state = ensure(command.txnId(), state);
 
-                if (isHomeShard) state.ensureAtLeast(command, newStatus, 
newProgress, node);
+                if (shard.isHome()) state.ensureAtLeast(command, newStatus, 
newProgress);
                 else ensure(command.txnId()).ensureAtLeast(Safe);
             }
         }
 
         @Override
-        public void accept(Command command, boolean isProgressShard, boolean 
isHomeShard)
+        public void accept(Command command, ProgressShard shard)
         {
-            ensureSafeOrAtLeast(command, isProgressShard, isHomeShard, 
Uncommitted, Expected);
+            ensureSafeOrAtLeast(command, shard, Uncommitted, Expected);
         }
 
         @Override
-        public void commit(Command command, boolean isProgressShard, boolean 
isHomeShard)
+        public void commit(Command command, ProgressShard shard)
         {
-            ensureSafeOrAtLeast(command, isProgressShard, isHomeShard, 
LocalStatus.Committed, NoneExpected);
+            ensureSafeOrAtLeast(command, shard, CoordinateStatus.Committed, 
NoneExpected);
         }
 
         @Override
-        public void readyToExecute(Command command, boolean isProgressShard, 
boolean isHomeShard)
+        public void readyToExecute(Command command, ProgressShard shard)
         {
-            ensureSafeOrAtLeast(command, isProgressShard, isHomeShard, 
LocalStatus.ReadyToExecute, Expected);
+            ensureSafeOrAtLeast(command, shard, 
CoordinateStatus.ReadyToExecute, Expected);
         }
 
         @Override
-        public void execute(Command command, boolean isProgressShard, boolean 
isHomeShard)
+        public void execute(Command command, ProgressShard shard)
         {
-            State state = recordExecute(command.txnId());
-
-            if (isProgressShard)
-            {
-                state = ensure(command.txnId(), state);
-
-                if (isHomeShard) state.home().executed(node, command);
-                else ensure(command.txnId()).ensureAtLeast(Safe);
-            }
+            recordExecute(command.txnId());
+            // this is the home shard's state ONLY, so we don't know it is 
fully durable locally
+            ensureSafeOrAtLeast(command, shard, 
CoordinateStatus.ReadyToExecute, Expected);
         }
 
         @Override
-        public void invalidate(Command command, boolean isProgressShard, 
boolean isHomeShard)
+        public void invalidate(Command command, ProgressShard shard)
         {
             State state = recordExecute(command.txnId());
 
-            if (isProgressShard)
+            Preconditions.checkState(shard == Home || state == null || 
state.coordinateState == null);
+
+            // note: we permit Unsure here, so we check if we have any local 
home state
+            if (shard.isProgress())
             {
                 state = ensure(command.txnId(), state);
 
-                if (isHomeShard) state.ensureAtLeast(command, 
LocalStatus.Done, Done, node);
+                if (shard.isHome()) state.ensureAtLeast(command, 
CoordinateStatus.Done, Done);
                 else ensure(command.txnId()).ensureAtLeast(Safe);
             }
         }
 
         @Override
-        public void executedOnAllShards(Command command, Set<Id> persistedOn)
-        {
-            State state = ensure(command.txnId());
-            state.home().executedOnAllShards(node, command, persistedOn);
-        }
-
-        @Override
-        public void waiting(PartialCommand blockedByCommand, Keys someKeys)
-        {
-            TxnId blockedBy = blockedByCommand.txnId();
-            if (!blockedByCommand.hasBeen(Executed))
-                ensure(blockedBy).recordBlocking(blockedByCommand, someKeys);
-        }
-    }
-
-    @Override
-    public void run()
-    {
-        for (Instance instance : instances)
-        {
-            // TODO: we want to be able to poll others about pending 
dependencies to check forward progress,
-            //       as we don't know all dependencies locally (or perhaps 
any, at execution time) so we may
-            //       begin expecting forward progress too early
-            // state map may be updated during iteration, so need to clone the 
values set
-            new ArrayList<>(instance.stateMap.values()).forEach(state -> 
state.update(node));
-        }
-    }
-
-    static class CoordinateApplyAndCheck extends AsyncFuture<Void> implements 
Callback<ApplyAndCheckOk>
-    {
-        final TxnId txnId;
-        final HomeState state;
-        final Set<Id> waitingOnResponses;
-
-        static Future<Void> applyAndCheck(Node node, TxnId txnId, Command 
command, HomeState state)
-        {
-            CoordinateApplyAndCheck coordinate = new 
CoordinateApplyAndCheck(txnId, state);
-            Topologies topologies = 
node.topology().preciseEpochs(command.txn(), command.executeAt().epoch);
-            state.globalNotPersisted.retainAll(topologies.nodes()); // we 
might have had some nodes from older shards that are now redundant
-            node.send(state.globalNotPersisted, id -> new ApplyAndCheck(id, 
topologies,
-                                                                        
command.txnId(), command.txn(), command.homeKey(),
-                                                                        
command.savedDeps(), command.executeAt(),
-                                                                        
command.writes(), command.result(),
-                                                                        
state.globalNotPersisted),
-                      coordinate);
-            return coordinate;
-        }
-
-        CoordinateApplyAndCheck(TxnId txnId, HomeState state)
+        public void durableLocal(TxnId txnId)
         {
-            this.txnId = txnId;
-            this.state = state;
-            this.waitingOnResponses = new HashSet<>(state.globalNotPersisted);
+            State state = ensure(txnId);
+            state.global().durableLocal(node);
         }
 
         @Override
-        public void onSuccess(Id from, ApplyAndCheckOk response)
+        public void durable(Command command, @Nullable Set<Id> persistedOn)
         {
-            state.globalNotPersisted.retainAll(response.notPersisted);
-            state.refreshGlobal(null, null, null, null);
+            State state = ensure(command.txnId());
+            if (!command.status().hasBeen(PreApplied))
+                state.recordBlocking(command.txnId(), Apply, 
command.someRoutingKeys());
+            state.local().durableGlobal();
+            state.global().durableGlobal(node, command, persistedOn);
         }
 
         @Override
-        public void onFailure(Id from, Throwable failure)
+        public void durable(TxnId txnId, RoutingKeys someKeys, ProgressShard 
shard)

Review Comment:
   We've been informed that the transaction is durable, i.e. a majority of 
replicas at all shards have recorded its outcome. So we can (and should) fetch 
the outcome locally - hence marking blocking, which is all that really does. 
Logically, our local state being up-to-date is now "blocked" on this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to