belliottsmith commented on code in PR #232:
URL: https://github.com/apache/cassandra-accord/pull/232#discussion_r2243433273


##########
accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java:
##########
@@ -754,45 +762,88 @@ public String reason()
         }
     }
 
-    long nextInvokerId()
+    protected void run(TxnStateKind runKind, TxnState run, SafeCommandStore 
safeStore, SafeCommand safeCommand)
     {
-        return nextInvokerId++;
+        // check this after fetching SafeCommand, as doing so can erase the 
command (and invalidate our state)
+        if (run.isDone(runKind))
+            return;
+
+        Invariants.require(get(run.txnId) == run, "Transaction state for %s 
does not match expected one %s", run.txnId, run);
+        Invariants.require(run.scheduledTimer() != runKind, "We are actively 
executing %s, but we are also scheduled to run this same TxnState later. This 
should not happen.", runKind);
+        Invariants.require(run.pendingTimer() != runKind, "We are actively 
executing %s, but we also have a pending scheduled task to run this same 
TxnState later. This should not happen.", runKind);
+
+        validatePreRunState(run, runKind);
+        if (runKind == Home)
+        {
+            boolean isRetry = run.homeProgress() != Queued;
+            if (isRetry) run.incrementHomeRetryCounter();
+            run.runHome(DefaultProgressLog.this, safeStore, safeCommand);
+        }
+        else
+        {
+            boolean isRetry = run.waitingProgress() != Queued;
+            if (isRetry) run.incrementWaitingRetryCounter();
+            run.runWaiting(DefaultProgressLog.this, safeStore, safeCommand);
+        }
     }
 
-    ObjectHashSet<Object> pending(TxnStateKind kind)
+    long nextCallbackId()
+    {
+        return ++nextCallbackId;
+    }
+
+    Object2ObjectHashMap<TxnId, PendingTask> pending(TxnStateKind kind)
     {
         return kind == Waiting ? pendingWaiting : pendingHome;
     }
 
-    void registerPending(TxnStateKind kind, TxnId txnId, Object object)
+    void registerPending(TxnStateKind kind, TxnId txnId, PendingTask register)
     {
-        ObjectHashSet<Object> pending = pending(kind);
-        Invariants.require(!pending.contains(txnId));
-        pending.add(object);
+        Object2ObjectHashMap<TxnId, PendingTask> collection = pending(kind);
+        PendingTask existing = collection.putIfAbsent(txnId, register);
+        Invariants.require(existing == null);
     }
 
     boolean hasPending(TxnStateKind kind, TxnId txnId)
     {
-        return pending(kind).contains(txnId);
+        return pending(kind).containsKey(txnId);
     }
 
-    void start(CallbackInvoker<?, ?> invoker, Object task)
+    void start(CallbackInvoker<?, ?> invoker, Object debug)
     {
-        active.put(invoker.id, task);
+        // task is an arbitrary object to help debug, but must be non-null
+        // TODO (expected): make active debuggable via virtual table or other 
mechanism
+        if (debug == null)
+            debug = invoker;
+        active.put(invoker.id, debug);
     }
 
-    boolean complete(SafeCommandStore safeStore, TxnStateKind kind, long id, 
Object active)
+    boolean complete(TxnStateKind kind, long id, TxnId txnId, PendingTask 
completing)
     {
-        this.active.remove(id);
-        boolean result = pending(kind).remove(active);
-        if (runBufferIndex < runBufferCount)
-            accept(safeStore);
-        return result;
+        boolean stillActive = active.remove(id) != null;
+        return complete(kind, txnId, completing) && stillActive;
+    }
+
+    boolean complete(TxnStateKind kind, TxnId txnId, PendingTask completing)
+    {
+        return pending(kind).remove(txnId, completing);
     }
 
-    void clearPending(TxnStateKind kind, TxnId txnId)
+    void clearPendingAndActive(TxnStateKind kind, TxnId txnId)
     {
-        pending(kind).remove(txnId);
+        PendingTask pending = pending(kind).remove(txnId);
+        if (pending instanceof CallbackInvoker<?,?>)
+            active.remove(((CallbackInvoker<?, ?>) pending).id);
+    }
+
+    public void requeue(SafeCommandStore safeStore, TxnStateKind kind, TxnId 
txnId)
+    {
+        PendingTask pending = pending(kind).remove(txnId);

Review Comment:
   yep



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org

Reply via email to