belliottsmith commented on code in PR #3481:
URL: https://github.com/apache/cassandra/pull/3481#discussion_r1732804588
##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -833,6 +846,143 @@ public Id nodeId()
return node.id();
}
+ @Override
+ public List<StoreTxnState> debug(TxnId txnId)
+ {
+ AsyncChain<List<StoreTxnState>> states = loadDebug(txnId);
+ try
+ {
+ return AsyncChains.getBlocking(states);
+ }
+ catch (InterruptedException e)
+ {
+ throw new UncheckedInterruptedException(e);
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e.getCause());
+ }
+ }
+
+ public AsyncChain<List<StoreTxnState>> loadDebug(TxnId original)
+ {
+ CommandStores commandStores = node.commandStores();
+ if (commandStores.count() == 0)
+ return AsyncChains.success(Collections.emptyList());
+ int[] ids = commandStores.ids();
+ List<AsyncChain<StoreTxnState>> chains = new ArrayList<>(ids.length);
+ for (int id : ids)
+ chains.add(loadDebug(original, commandStores.forId(id)));
+ return AsyncChains.all(chains);
+ }
+
+ private AsyncChain<StoreTxnState> loadDebug(TxnId txnId, CommandStore
store)
+ {
+ StoreTxnState.Builder state = new StoreTxnState.Builder(store.id());
+ return populate(state, store, txnId).map(ignore -> state.build());
+ }
+
+ private static AsyncChain<Void> populate(StoreTxnState.Builder state,
CommandStore store, TxnId txnId)
+ {
+ AsyncChain<AsyncChain<Void>> submit =
store.submit(PreLoadContext.contextFor(txnId), in -> {
+ AsyncChain<Void> chain = populate(state, (AccordSafeCommandStore)
in, txnId);
+ return chain == null ? AsyncChains.success(null) : chain;
+ });
+ return submit.flatMap(Function.identity());
+ }
+
+ private static AsyncChain<Void> populate(StoreTxnState.Builder state,
CommandStore commandStore, PartitionKey blockedBy, TxnId txnId, Timestamp
executeAt)
+ {
+ AsyncChain<AsyncChain<Void>> submit =
commandStore.submit(PreLoadContext.contextFor(txnId, Keys.of(blockedBy),
KeyHistory.COMMANDS), in -> {
+ AsyncChain<Void> chain = populate(state, (AccordSafeCommandStore)
in, blockedBy, txnId, executeAt);
+ return chain == null ? AsyncChains.success(null) : chain;
+ });
+ return submit.flatMap(Function.identity());
+ }
+
+ @Nullable
+ private static AsyncChain<Void> populate(StoreTxnState.Builder state,
AccordSafeCommandStore safeStore, TxnId txnId)
+ {
+ AccordSafeCommand safeCommand = safeStore.getIfLoaded(txnId);
+ Invariants.nonNull(safeCommand, "Txn %s is not in the cache", txnId);
+ if (safeCommand.current() == null ||
safeCommand.current().saveStatus() == SaveStatus.Uninitialised)
+ return null;
+ StoreTxnState.TxnState cmdTxnState = populate(state,
safeCommand.current());
+ if (cmdTxnState.notBlocked())
+ return null;
+ //TODO (safety): check depth
+ List<AsyncChain<Void>> chains = new ArrayList<>();
+ for (TxnId blockedBy : cmdTxnState.blockedBy)
+ {
+ if (state.knows(blockedBy)) continue;
+ // need to fetch the state
+ if (safeStore.getIfLoaded(blockedBy) != null)
+ {
+ AsyncChain<Void> chain = populate(state, safeStore, blockedBy);
+ if (chain != null)
+ chains.add(chain);
+ }
+ else
+ {
+ // go fetch it
+ chains.add(populate(state, safeStore.commandStore(),
blockedBy));
+ }
+ }
+ for (PartitionKey blockedBy : cmdTxnState.blockedByKey)
+ {
+ if (state.keys.containsKey(blockedBy)) continue;
+ if (safeStore.getCommandsForKeyIfLoaded(blockedBy) != null)
+ {
+ AsyncChain<Void> chain = populate(state, safeStore, blockedBy,
txnId, safeCommand.current().executeAt());
+ if (chain != null)
+ chains.add(chain);
+ }
+ else
+ {
+ // go fetch it
+ chains.add(populate(state, safeStore.commandStore(),
blockedBy, txnId, safeCommand.current().executeAt()));
+ }
+ }
+ if (chains.isEmpty())
+ return null;
+ return AsyncChains.all(chains).map(ignore -> null);
+ }
+
+ private static AsyncChain<Void> populate(StoreTxnState.Builder state,
AccordSafeCommandStore safeStore, PartitionKey pk, TxnId txnId, Timestamp
executeAt)
+ {
+ AccordSafeCommandsForKey commandsForKey =
safeStore.getCommandsForKeyIfLoaded(pk);
+ TxnId blocking = commandsForKey.current().blockedOnTxnId(txnId,
executeAt);
+ if (blocking instanceof CommandsForKey.TxnInfo)
+ blocking = ((CommandsForKey.TxnInfo) blocking).plainTxnId();
+ state.keys.put(pk, blocking);
+ if (state.txns.containsKey(blocking)) return null;
+ if (safeStore.getIfLoaded(blocking) != null) return populate(state,
safeStore, blocking);
+ return populate(state, safeStore.commandStore(), blocking);
+ }
+
+ private static StoreTxnState.TxnState populate(StoreTxnState.Builder
state, Command cmd)
+ {
+ StoreTxnState.Builder.TxnBuilder cmdTxnState = state.txn(cmd.txnId(),
cmd.executeAt(), cmd.saveStatus());
+ if (!cmd.hasBeen(Status.Applied) && cmd.isCommitted())
+ {
+ // check blocking state
+ Command.WaitingOn waitingOn = cmd.asCommitted().waitingOn();
+ waitingOn.waitingOn.reverseForEach(null, null, null, null, (i1,
i2, i3, i4, i) -> {
+ if (i < waitingOn.txnIdCount())
+ {
+ // blocked on txn
+ cmdTxnState.blockedBy.add(waitingOn.txnId(i));
Review Comment:
This is going to potentially list a lot of false dependencies. We update the
waitingOn bitset for transactions lazily, waiting only for the highest TxnId
and hoping it will provide a cheap mechanism for notifying us about earlier
TxnId without us having to load them. We should really only list one TxnId that
we are blocking on, or at least we should filter the resulting list to those
that are actually not Applied.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]