dcapwell commented on code in PR #3481:
URL: https://github.com/apache/cassandra/pull/3481#discussion_r1733548717
##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -833,6 +846,143 @@ public Id nodeId()
return node.id();
}
+ @Override
+ public List<StoreTxnState> debug(TxnId txnId)
+ {
+ AsyncChain<List<StoreTxnState>> states = loadDebug(txnId);
+ try
+ {
+ return AsyncChains.getBlocking(states);
+ }
+ catch (InterruptedException e)
+ {
+ throw new UncheckedInterruptedException(e);
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e.getCause());
+ }
+ }
+
+ public AsyncChain<List<StoreTxnState>> loadDebug(TxnId original)
+ {
+ CommandStores commandStores = node.commandStores();
+ if (commandStores.count() == 0)
+ return AsyncChains.success(Collections.emptyList());
+ int[] ids = commandStores.ids();
+ List<AsyncChain<StoreTxnState>> chains = new ArrayList<>(ids.length);
+ for (int id : ids)
+ chains.add(loadDebug(original, commandStores.forId(id)));
+ return AsyncChains.all(chains);
+ }
+
+ private AsyncChain<StoreTxnState> loadDebug(TxnId txnId, CommandStore
store)
+ {
+ StoreTxnState.Builder state = new StoreTxnState.Builder(store.id());
+ return populate(state, store, txnId).map(ignore -> state.build());
+ }
+
+ private static AsyncChain<Void> populate(StoreTxnState.Builder state,
CommandStore store, TxnId txnId)
+ {
+ AsyncChain<AsyncChain<Void>> submit =
store.submit(PreLoadContext.contextFor(txnId), in -> {
+ AsyncChain<Void> chain = populate(state, (AccordSafeCommandStore)
in, txnId);
+ return chain == null ? AsyncChains.success(null) : chain;
+ });
+ return submit.flatMap(Function.identity());
+ }
+
+ private static AsyncChain<Void> populate(StoreTxnState.Builder state,
CommandStore commandStore, PartitionKey blockedBy, TxnId txnId, Timestamp
executeAt)
+ {
+ AsyncChain<AsyncChain<Void>> submit =
commandStore.submit(PreLoadContext.contextFor(txnId, Keys.of(blockedBy),
KeyHistory.COMMANDS), in -> {
+ AsyncChain<Void> chain = populate(state, (AccordSafeCommandStore)
in, blockedBy, txnId, executeAt);
+ return chain == null ? AsyncChains.success(null) : chain;
+ });
+ return submit.flatMap(Function.identity());
+ }
+
+ @Nullable
+ private static AsyncChain<Void> populate(StoreTxnState.Builder state,
AccordSafeCommandStore safeStore, TxnId txnId)
+ {
+ AccordSafeCommand safeCommand = safeStore.getIfLoaded(txnId);
+ Invariants.nonNull(safeCommand, "Txn %s is not in the cache", txnId);
+ if (safeCommand.current() == null ||
safeCommand.current().saveStatus() == SaveStatus.Uninitialised)
+ return null;
+ StoreTxnState.TxnState cmdTxnState = populate(state,
safeCommand.current());
+ if (cmdTxnState.notBlocked())
+ return null;
+ //TODO (safety): check depth
+ List<AsyncChain<Void>> chains = new ArrayList<>();
+ for (TxnId blockedBy : cmdTxnState.blockedBy)
+ {
+ if (state.knows(blockedBy)) continue;
+ // need to fetch the state
+ if (safeStore.getIfLoaded(blockedBy) != null)
+ {
+ AsyncChain<Void> chain = populate(state, safeStore, blockedBy);
+ if (chain != null)
+ chains.add(chain);
+ }
+ else
+ {
+ // go fetch it
+ chains.add(populate(state, safeStore.commandStore(),
blockedBy));
+ }
+ }
+ for (PartitionKey blockedBy : cmdTxnState.blockedByKey)
+ {
+ if (state.keys.containsKey(blockedBy)) continue;
+ if (safeStore.getCommandsForKeyIfLoaded(blockedBy) != null)
+ {
+ AsyncChain<Void> chain = populate(state, safeStore, blockedBy,
txnId, safeCommand.current().executeAt());
+ if (chain != null)
+ chains.add(chain);
+ }
+ else
+ {
+ // go fetch it
+ chains.add(populate(state, safeStore.commandStore(),
blockedBy, txnId, safeCommand.current().executeAt()));
+ }
+ }
+ if (chains.isEmpty())
+ return null;
+ return AsyncChains.all(chains).map(ignore -> null);
+ }
+
+ private static AsyncChain<Void> populate(StoreTxnState.Builder state,
AccordSafeCommandStore safeStore, PartitionKey pk, TxnId txnId, Timestamp
executeAt)
+ {
+ AccordSafeCommandsForKey commandsForKey =
safeStore.getCommandsForKeyIfLoaded(pk);
+ TxnId blocking = commandsForKey.current().blockedOnTxnId(txnId,
executeAt);
+ if (blocking instanceof CommandsForKey.TxnInfo)
+ blocking = ((CommandsForKey.TxnInfo) blocking).plainTxnId();
+ state.keys.put(pk, blocking);
+ if (state.txns.containsKey(blocking)) return null;
+ if (safeStore.getIfLoaded(blocking) != null) return populate(state,
safeStore, blocking);
+ return populate(state, safeStore.commandStore(), blocking);
+ }
+
+ private static StoreTxnState.TxnState populate(StoreTxnState.Builder
state, Command cmd)
+ {
+ StoreTxnState.Builder.TxnBuilder cmdTxnState = state.txn(cmd.txnId(),
cmd.executeAt(), cmd.saveStatus());
+ if (!cmd.hasBeen(Status.Applied) && cmd.isCommitted())
+ {
+ // check blocking state
+ Command.WaitingOn waitingOn = cmd.asCommitted().waitingOn();
+ waitingOn.waitingOn.reverseForEach(null, null, null, null, (i1,
i2, i3, i4, i) -> {
+ if (i < waitingOn.txnIdCount())
+ {
+ // blocked on txn
+ cmdTxnState.blockedBy.add(waitingOn.txnId(i));
Review Comment:
> We should really only list one TxnId that we are blocking on, or at least
we should filter the resulting list to those that are actually not Applied
can def filter for `not Applied`. My thinking is we figure out what we
think, then reify the state... we can filter while loading... this can help
avoid false positives in the other directly (we only list 1 txn, then when we
load it its Applied... so we are waiting on 0 txn (but actually waiting on 100)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]