dcapwell commented on code in PR #3481:
URL: https://github.com/apache/cassandra/pull/3481#discussion_r1733548717


##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -833,6 +846,143 @@ public Id nodeId()
         return node.id();
     }
 
+    @Override
+    public List<StoreTxnState> debug(TxnId txnId)
+    {
+        AsyncChain<List<StoreTxnState>> states = loadDebug(txnId);
+        try
+        {
+            return AsyncChains.getBlocking(states);
+        }
+        catch (InterruptedException e)
+        {
+            throw new UncheckedInterruptedException(e);
+        }
+        catch (ExecutionException e)
+        {
+            throw new RuntimeException(e.getCause());
+        }
+    }
+
+    public AsyncChain<List<StoreTxnState>> loadDebug(TxnId original)
+    {
+        CommandStores commandStores = node.commandStores();
+        if (commandStores.count() == 0)
+            return AsyncChains.success(Collections.emptyList());
+        int[] ids = commandStores.ids();
+        List<AsyncChain<StoreTxnState>> chains = new ArrayList<>(ids.length);
+        for (int id : ids)
+            chains.add(loadDebug(original, commandStores.forId(id)));
+        return AsyncChains.all(chains);
+    }
+
+    private AsyncChain<StoreTxnState> loadDebug(TxnId txnId, CommandStore 
store)
+    {
+        StoreTxnState.Builder state = new StoreTxnState.Builder(store.id());
+        return populate(state, store, txnId).map(ignore -> state.build());
+    }
+
+    private static AsyncChain<Void> populate(StoreTxnState.Builder state, 
CommandStore store, TxnId txnId)
+    {
+        AsyncChain<AsyncChain<Void>> submit = 
store.submit(PreLoadContext.contextFor(txnId), in -> {
+            AsyncChain<Void> chain = populate(state, (AccordSafeCommandStore) 
in, txnId);
+            return chain == null ? AsyncChains.success(null) : chain;
+        });
+        return submit.flatMap(Function.identity());
+    }
+
+    private static AsyncChain<Void> populate(StoreTxnState.Builder state, 
CommandStore commandStore, PartitionKey blockedBy, TxnId txnId, Timestamp 
executeAt)
+    {
+        AsyncChain<AsyncChain<Void>> submit = 
commandStore.submit(PreLoadContext.contextFor(txnId, Keys.of(blockedBy), 
KeyHistory.COMMANDS), in -> {
+            AsyncChain<Void> chain = populate(state, (AccordSafeCommandStore) 
in, blockedBy, txnId, executeAt);
+            return chain == null ? AsyncChains.success(null) : chain;
+        });
+        return submit.flatMap(Function.identity());
+    }
+
+    @Nullable
+    private static AsyncChain<Void> populate(StoreTxnState.Builder state, 
AccordSafeCommandStore safeStore, TxnId txnId)
+    {
+        AccordSafeCommand safeCommand = safeStore.getIfLoaded(txnId);
+        Invariants.nonNull(safeCommand, "Txn %s is not in the cache", txnId);
+        if (safeCommand.current() == null || 
safeCommand.current().saveStatus() == SaveStatus.Uninitialised)
+            return null;
+        StoreTxnState.TxnState cmdTxnState = populate(state, 
safeCommand.current());
+        if (cmdTxnState.notBlocked())
+            return null;
+        //TODO (safety): check depth
+        List<AsyncChain<Void>> chains = new ArrayList<>();
+        for (TxnId blockedBy : cmdTxnState.blockedBy)
+        {
+            if (state.knows(blockedBy)) continue;
+            // need to fetch the state
+            if (safeStore.getIfLoaded(blockedBy) != null)
+            {
+                AsyncChain<Void> chain = populate(state, safeStore, blockedBy);
+                if (chain != null)
+                    chains.add(chain);
+            }
+            else
+            {
+                // go fetch it
+                chains.add(populate(state, safeStore.commandStore(), 
blockedBy));
+            }
+        }
+        for (PartitionKey blockedBy : cmdTxnState.blockedByKey)
+        {
+            if (state.keys.containsKey(blockedBy)) continue;
+            if (safeStore.getCommandsForKeyIfLoaded(blockedBy) != null)
+            {
+                AsyncChain<Void> chain = populate(state, safeStore, blockedBy, 
txnId, safeCommand.current().executeAt());
+                if (chain != null)
+                    chains.add(chain);
+            }
+            else
+            {
+                // go fetch it
+                chains.add(populate(state, safeStore.commandStore(), 
blockedBy, txnId, safeCommand.current().executeAt()));
+            }
+        }
+        if (chains.isEmpty())
+            return null;
+        return AsyncChains.all(chains).map(ignore -> null);
+    }
+
+    private static AsyncChain<Void> populate(StoreTxnState.Builder state, 
AccordSafeCommandStore safeStore, PartitionKey pk, TxnId txnId, Timestamp 
executeAt)
+    {
+        AccordSafeCommandsForKey commandsForKey = 
safeStore.getCommandsForKeyIfLoaded(pk);
+        TxnId blocking = commandsForKey.current().blockedOnTxnId(txnId, 
executeAt);
+        if (blocking instanceof CommandsForKey.TxnInfo)
+            blocking = ((CommandsForKey.TxnInfo) blocking).plainTxnId();
+        state.keys.put(pk, blocking);
+        if (state.txns.containsKey(blocking)) return null;
+        if (safeStore.getIfLoaded(blocking) != null) return populate(state, 
safeStore, blocking);
+        return populate(state, safeStore.commandStore(), blocking);
+    }
+
+    private static StoreTxnState.TxnState populate(StoreTxnState.Builder 
state, Command cmd)
+    {
+        StoreTxnState.Builder.TxnBuilder cmdTxnState = state.txn(cmd.txnId(), 
cmd.executeAt(), cmd.saveStatus());
+        if (!cmd.hasBeen(Status.Applied) && cmd.isCommitted())
+        {
+            // check blocking state
+            Command.WaitingOn waitingOn = cmd.asCommitted().waitingOn();
+            waitingOn.waitingOn.reverseForEach(null, null, null, null, (i1, 
i2, i3, i4, i) -> {
+                if (i < waitingOn.txnIdCount())
+                {
+                    // blocked on txn
+                    cmdTxnState.blockedBy.add(waitingOn.txnId(i));

Review Comment:
   > We should really only list one TxnId that we are blocking on, or at least 
we should filter the resulting list to those that are actually not Applied
   
   can def filter for `not Applied`.  My thinking is we figure out what we 
think, then reify the state... we can filter while loading... this can help 
avoid false positives in the other directly (we only list 1 txn, then when we 
load it its Applied... so we are waiting on 0 txn (but actually waiting on 100)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to