belliottsmith commented on code in PR #232: URL: https://github.com/apache/cassandra-accord/pull/232#discussion_r2243433273
########## accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java: ########## @@ -754,45 +762,88 @@ public String reason() } } - long nextInvokerId() + protected void run(TxnStateKind runKind, TxnState run, SafeCommandStore safeStore, SafeCommand safeCommand) { - return nextInvokerId++; + // check this after fetching SafeCommand, as doing so can erase the command (and invalidate our state) + if (run.isDone(runKind)) + return; + + Invariants.require(get(run.txnId) == run, "Transaction state for %s does not match expected one %s", run.txnId, run); + Invariants.require(run.scheduledTimer() != runKind, "We are actively executing %s, but we are also scheduled to run this same TxnState later. This should not happen.", runKind); + Invariants.require(run.pendingTimer() != runKind, "We are actively executing %s, but we also have a pending scheduled task to run this same TxnState later. This should not happen.", runKind); + + validatePreRunState(run, runKind); + if (runKind == Home) + { + boolean isRetry = run.homeProgress() != Queued; + if (isRetry) run.incrementHomeRetryCounter(); + run.runHome(DefaultProgressLog.this, safeStore, safeCommand); + } + else + { + boolean isRetry = run.waitingProgress() != Queued; + if (isRetry) run.incrementWaitingRetryCounter(); + run.runWaiting(DefaultProgressLog.this, safeStore, safeCommand); + } } - ObjectHashSet<Object> pending(TxnStateKind kind) + long nextCallbackId() + { + return ++nextCallbackId; + } + + Object2ObjectHashMap<TxnId, PendingTask> pending(TxnStateKind kind) { return kind == Waiting ? pendingWaiting : pendingHome; } - void registerPending(TxnStateKind kind, TxnId txnId, Object object) + void registerPending(TxnStateKind kind, TxnId txnId, PendingTask register) { - ObjectHashSet<Object> pending = pending(kind); - Invariants.require(!pending.contains(txnId)); - pending.add(object); + Object2ObjectHashMap<TxnId, PendingTask> collection = pending(kind); + PendingTask existing = collection.putIfAbsent(txnId, register); + Invariants.require(existing == null); } boolean hasPending(TxnStateKind kind, TxnId txnId) { - return pending(kind).contains(txnId); + return pending(kind).containsKey(txnId); } - void start(CallbackInvoker<?, ?> invoker, Object task) + void start(CallbackInvoker<?, ?> invoker, Object debug) { - active.put(invoker.id, task); + // task is an arbitrary object to help debug, but must be non-null + // TODO (expected): make active debuggable via virtual table or other mechanism + if (debug == null) + debug = invoker; + active.put(invoker.id, debug); } - boolean complete(SafeCommandStore safeStore, TxnStateKind kind, long id, Object active) + boolean complete(TxnStateKind kind, long id, TxnId txnId, PendingTask completing) { - this.active.remove(id); - boolean result = pending(kind).remove(active); - if (runBufferIndex < runBufferCount) - accept(safeStore); - return result; + boolean stillActive = active.remove(id) != null; + return complete(kind, txnId, completing) && stillActive; + } + + boolean complete(TxnStateKind kind, TxnId txnId, PendingTask completing) + { + return pending(kind).remove(txnId, completing); } - void clearPending(TxnStateKind kind, TxnId txnId) + void clearPendingAndActive(TxnStateKind kind, TxnId txnId) { - pending(kind).remove(txnId); + PendingTask pending = pending(kind).remove(txnId); + if (pending instanceof CallbackInvoker<?,?>) + active.remove(((CallbackInvoker<?, ?>) pending).id); + } + + public void requeue(SafeCommandStore safeStore, TxnStateKind kind, TxnId txnId) + { + PendingTask pending = pending(kind).remove(txnId); Review Comment: yep -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org