ifesdjeen commented on code in PR #226:
URL: https://github.com/apache/cassandra-accord/pull/226#discussion_r2222626125


##########
accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java:
##########
@@ -351,15 +353,31 @@ public void clearBefore(SafeCommandStore safeStore, TxnId 
clearWaitingBefore, Tx
         }
     }
 
+    @Override
+    public void start()
+    {
+        commandStore.maybeExecuteImmediately(() -> {
+            stopped = false;
+            accept(null);
+        });
+    }
+
+    @Override
+    public void stop()
+    {
+        stopped = true;

Review Comment:
   Shouldn't stopped be changed in command store only?



##########
accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java:
##########
@@ -574,32 +615,41 @@ private void processAwaitingEpoch(@Nullable 
SafeCommandStore safeStore, List<Run
         for (int i = 0 ; i < awaitingEpochBufferCount ; ++i)
         {
             RunInvoker awaiting = awaitingEpochBuffer[i];
-            if (awaiting.run.txnId.epoch() <= hasEpoch) 
addIfReadyElseSubmit(safeStore, awaiting, readyToRun);
+            if (awaiting.run.txnId.epoch() <= hasEpoch) 
addToRunBuffer(awaiting);
             else awaitingEpochBuffer[retainCount++] = awaiting;
         }
         awaitingEpochBufferCount = retainCount;
         if (retainCount == 0) awaitingEpochBuffer = 
EMPTY_AWAITING_EPOCH_BUFFER;
         else if (retainCount < awaitingEpochBuffer.length / 2) 
awaitingEpochBuffer = Arrays.copyOf(awaitingEpochBuffer, retainCount);
     }
 
-    private void processReadyToRun(SafeCommandStore safeStore, 
List<RunInvoker> readyToRun)
+    private void processRunBuffer(SafeCommandStore safeStore)
     {
-        if (safeStore == null || readyToRun.isEmpty())
-            return;
-
-        int i = 0;
-        try
+        while (runBufferIndex < runBufferCount)
         {
-            while (i < readyToRun.size())
-                readyToRun.get(i++).accept(safeStore);
-        }
-        finally
-        {
-            while (i < readyToRun.size())
+            if (active.size() >= maxConcurrency)
+            {
+                maybeShrinkRunBuffer();
+                return;
+            }
+
+            RunInvoker run = (RunInvoker) runBuffer[runBufferIndex];
+            if (safeStore == null || !safeStore.canExecuteWith(run))
             {
-                commandStore.execute((PreLoadContext.Empty) () -> "Run 
ProgressLog", readyToRun.get(i++), node.agent());
+                maybeShrinkRunBuffer();
+                commandStore.execute(run, this);
+                return;
             }
+
+            ++runBufferIndex;
+            try { run.accept(safeStore); }
+            catch (Throwable t) { node.agent().onUncaughtException(t); }
         }
+
+        if (runBuffer.length > ArrayBuffers.MIN_BUFFER_SIZE)
+            cachedAny().forceDiscard(runBuffer, runBufferCount);
+        runBufferIndex = runBufferCount = 0;

Review Comment:
   should this one be 
   
   ```
   runBufferIndex = runBufferCount = 0;
   ```



##########
accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java:
##########
@@ -690,50 +727,59 @@ public int hashCode()
         {
             return run.txnId.hashCode();
         }
+
+        @Override
+        public TxnId primaryTxnId()
+        {
+            return run.txnId;
+        }
+
+        @Override
+        public String reason()
+        {
+            return "Invoke " + runKind + " Progress Log";
+        }
     }
 
     long nextInvokerId()
     {
         return nextInvokerId++;
     }
 
-    ObjectHashSet<Object> active(TxnStateKind kind)
-    {
-        return kind == Waiting ? activeWaiting : activeHome;
-    }
-
-    void registerActive(TxnStateKind kind, TxnId txnId, Object object)
+    ObjectHashSet<Object> pending(TxnStateKind kind)
     {
-        ObjectHashSet<Object> active = active(kind);
-        Invariants.require(!active.contains(txnId));
-        active.add(object);
+        return kind == Waiting ? pendingWaiting : pendingHome;
     }
 
-    boolean hasActive(TxnStateKind kind, TxnId txnId)
+    void registerPending(TxnStateKind kind, TxnId txnId, Object object)
     {
-        return active(kind).contains(txnId);
+        ObjectHashSet<Object> pending = pending(kind);
+        Invariants.require(!pending.contains(txnId));
+        pending.add(object);
     }
 
-    void debugActive(Object debug, CallbackInvoker<?, ?> invoker)
+    boolean hasPending(TxnStateKind kind, TxnId txnId)
     {
-        if (debugActive != null)
-            debugActive.put(invoker.id, debug);
+        return pending(kind).contains(txnId);
     }
 
-    void undebugActive(CallbackInvoker<?, ?> invoker)
+    void start(CallbackInvoker<?, ?> invoker, Object task)
     {
-        if (debugActive != null)
-            debugActive.remove(invoker.id);
+        active.put(invoker.id, task);
     }
 
-    boolean deregisterActive(TxnStateKind kind, Object object)
+    boolean complete(SafeCommandStore safeStore, TxnStateKind kind, long id, 
Object active)
     {
-        return active(kind).remove(object);
+        this.active.remove(id);
+        boolean result = pending(kind).remove(active);
+        if (runBufferIndex < runBufferCount)
+            accept(safeStore);
+        return result;
     }
 
     void clearActive(TxnStateKind kind, TxnId txnId)
     {
-        active(kind).remove(txnId);
+        pending(kind).remove(txnId);

Review Comment:
   Should we rename the method to clearPending?



##########
accord-core/src/main/java/accord/impl/AbstractReplayer.java:
##########
@@ -23,21 +23,46 @@
 import accord.local.CommandStore;
 import accord.local.Commands;
 import accord.local.PreLoadContext;
+import accord.local.RedundantBefore;
 import accord.local.SafeCommand;
 import accord.local.SafeCommandStore;
+import accord.local.StoreParticipants;
 import accord.primitives.Participants;
 import accord.primitives.SaveStatus;
 import accord.primitives.TxnId;
 import accord.utils.Invariants;
 
+import static 
accord.local.RedundantStatus.Property.LOCALLY_DURABLE_TO_COMMAND_STORE;
+import static 
accord.local.RedundantStatus.Property.LOCALLY_DURABLE_TO_DATA_STORE;
 import static accord.primitives.SaveStatus.Applying;
 import static accord.primitives.SaveStatus.PreApplied;
 import static accord.primitives.SaveStatus.TruncatedApplyWithOutcome;
 import static accord.primitives.Status.Applied;
 import static accord.primitives.Txn.Kind.Write;
 
-public abstract class AbstractLoader implements Journal.Loader
+public abstract class AbstractReplayer implements Journal.Replayer
 {
+    final RedundantBefore redundantBefore;
+    final TxnId minReplay;
+
+    protected AbstractReplayer(RedundantBefore redundantBefore)
+    {
+        this.redundantBefore = redundantBefore;
+        this.minReplay = TxnId.noneIfNull(redundantBefore.foldl((b, v) -> 
TxnId.nonNullOrMin(v, TxnId.min(b.maxBound(LOCALLY_DURABLE_TO_DATA_STORE), 
b.maxBound(LOCALLY_DURABLE_TO_COMMAND_STORE))), null, ignore -> false));
+    }
+
+    protected boolean maybeShouldReplay(TxnId txnId)
+    {
+        return txnId.compareTo(minReplay) >= 0;
+    }
+
+    protected boolean shouldReplay(TxnId txnId, StoreParticipants participants)
+    {
+        Participants<?> search = participants.route();
+        if (search == null) search = participants.hasTouched();

Review Comment:
   Can't it be that hasTouched is also null, in which case we can opt for a 
global min?



##########
accord-core/src/main/java/accord/local/RedundantStatus.java:
##########
@@ -86,32 +88,46 @@ public enum Property
          * See also {@link SafeCommandStore#safeToReadAt()}.
          * TODO (expected): do we need to distinguish this case from DEFUNCT?
          */
-        PRE_BOOTSTRAP_OR_STALE             (true, true,  LT, LOCALLY_DEFUNCT),
-        PRE_BOOTSTRAP                      (true, true,  LT, 
PRE_BOOTSTRAP_OR_STALE),
+        PRE_BOOTSTRAP_OR_STALE             ( true,  true,  LT, 
LOCALLY_DEFUNCT),
+        PRE_BOOTSTRAP                      ( true,  true,  LT, 
PRE_BOOTSTRAP_OR_STALE),
 
         LOCALLY_WITNESSED                  (false,  true,  LE),
         // we've applied a sync point locally covering the transaction, but 
the transaction itself may not have applied
         LOCALLY_SYNCED                     (false,  true,  LE, 
LOCALLY_REDUNDANT),
-        LOCALLY_APPLIED                    (true, false, LE, LOCALLY_SYNCED),
+        LOCALLY_APPLIED                    ( true, false,  LE, LOCALLY_SYNCED),
 
         /**
          * We have fully executed until across all a majority of replicas for 
the range in question,
          * but not necessarily ourselves.
          */
-        MAJORITY_APPLIED                 (false,  true,  LE),
+        MAJORITY_APPLIED                   (false,  true,  LE),
 
         /**
          * We have fully executed until across all healthy non-bootstrapping 
replicas for the range in question,
          * but not necessarily ourselves.
          */
-        SHARD_APPLIED                      (false, true, LE, MAJORITY_APPLIED),
+        SHARD_APPLIED                      (false,  true,  LE, 
MAJORITY_APPLIED),
 
         TRUNCATE_BEFORE                    (false,  true,  LT, SHARD_APPLIED, 
LOCALLY_SYNCED),
         GC_BEFORE                          (false,  true,  LT, 
TRUNCATE_BEFORE),
 
         // not persisted
         WAS_OWNED                          (false,  false, LT, 
LOCALLY_DEFUNCT),
-        NOT_OWNED                          (false, false, LT),
+        NOT_OWNED                          (false,  false, LT),
+
+        // added later, so vaguely out of order (but no specific order to this 
enum)
+
+        /**
+         * We have applied the preceding transactions durably to the store, so 
that we can safely truncate the Write
+         * information as we will not need to replay it to the store (but may 
need to replay it to

Review Comment:
   unfinished sentence?



##########
accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java:
##########
@@ -466,40 +484,67 @@ public void invalidIfUncommitted(TxnId txnId)
     @Override
     public void accept(@Nullable SafeCommandStore safeStore)
     {
+        if (stopped || processing)
+            return;
+
         long nowMicros = node.elapsed(TimeUnit.MICROSECONDS);
+        processing = true;
         try
         {
-            if (DefaultProgressLogs.pauseForTest(this))
+            processAwaitingEpoch();
+            try (BufferList<TxnState> preRunBuffer = new BufferList<>())
             {
-                logger.info("Skipping progress log because it is paused for 
test");
-                return;
-            }
-
-            try (BufferList<RunInvoker> readyToRun = safeStore == null ? null 
: new BufferList<>())
-            {
-                processAwaitingEpoch(safeStore, readyToRun);
                 // drain to a buffer to avoid reentrancy in timers
-                runBuffer = EMPTY_RUN_BUFFER;
-                runBufferCount = 0;
-                timers.advance(nowMicros, this, 
DefaultProgressLog::addToRunBuffer);
-                processRunBuffer(safeStore, nowMicros, readyToRun);
-                cachedAny().forceDiscard(runBuffer, runBufferCount);
-                processReadyToRun(safeStore, readyToRun);
+                timers.advance(nowMicros, preRunBuffer, BufferList::add);
+                updateRunBuffer(nowMicros, preRunBuffer);
             }
+            processRunBuffer(safeStore);
+
             if (awaitingEpochBufferCount > 0)
                 rerunWithPendingEpoch();
         }
         catch (Throwable t)
         {
             node.agent().onUncaughtException(t);
         }
+        finally
+        {
+            processing = false;
+        }
     }
 
-    private void addToRunBuffer(TxnState add)
+    private void addToRunBuffer(RunInvoker readyToRun)
     {
         if (runBufferCount == runBuffer.length)
-            runBuffer = cachedAny().resize(runBuffer, runBufferCount, 
Math.max(8, runBuffer.length * 2));
-        runBuffer[runBufferCount++] = add;
+        {
+            int newCount = runBufferCount - runBufferIndex;
+            Object[] newBuffer = cachedAny().get(Math.max(8, newCount * 2));
+            replaceRunBuffer(newBuffer);
+        }
+        runBuffer[runBufferCount++] = readyToRun;
+    }
+
+    private void replaceRunBuffer(Object[] newBuffer)
+    {
+        Object[] prevBuffer = runBuffer;
+        int prevCount = runBufferCount;
+        int newCount = prevCount - runBufferIndex;
+        System.arraycopy(prevBuffer, runBufferIndex, newBuffer, 0, newCount);
+        runBuffer = newBuffer;
+        runBufferIndex = 0;
+        runBufferCount = newCount;
+        if (prevBuffer.length >= ArrayBuffers.MIN_BUFFER_SIZE)
+            cachedAny().forceDiscard(prevBuffer, prevCount);
+    }
+
+    private void maybeShrinkRunBuffer()
+    {
+        if (runBuffer.length >= (runBufferCount - runBufferIndex)/2)
+        {
+            int newCount = runBufferCount - runBufferIndex;
+            Object[] newBuffer = new Object[newCount + (newCount/2)];

Review Comment:
   Just wondering: why not try using `cachedAny` when shrinking?



##########
accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java:
##########
@@ -767,6 +813,16 @@ public void maybeNotify()
         }
     }
 
+    public void setMaxConcurrency(int maxConcurrency)
+    {
+        this.maxConcurrency = maxConcurrency;

Review Comment:
   nit: should we add strictly pos check?
   



##########
accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java:
##########
@@ -466,40 +484,67 @@ public void invalidIfUncommitted(TxnId txnId)
     @Override
     public void accept(@Nullable SafeCommandStore safeStore)
     {
+        if (stopped || processing)
+            return;
+
         long nowMicros = node.elapsed(TimeUnit.MICROSECONDS);
+        processing = true;
         try
         {
-            if (DefaultProgressLogs.pauseForTest(this))
+            processAwaitingEpoch();
+            try (BufferList<TxnState> preRunBuffer = new BufferList<>())
             {
-                logger.info("Skipping progress log because it is paused for 
test");
-                return;
-            }
-
-            try (BufferList<RunInvoker> readyToRun = safeStore == null ? null 
: new BufferList<>())
-            {
-                processAwaitingEpoch(safeStore, readyToRun);
                 // drain to a buffer to avoid reentrancy in timers
-                runBuffer = EMPTY_RUN_BUFFER;
-                runBufferCount = 0;
-                timers.advance(nowMicros, this, 
DefaultProgressLog::addToRunBuffer);
-                processRunBuffer(safeStore, nowMicros, readyToRun);
-                cachedAny().forceDiscard(runBuffer, runBufferCount);
-                processReadyToRun(safeStore, readyToRun);
+                timers.advance(nowMicros, preRunBuffer, BufferList::add);
+                updateRunBuffer(nowMicros, preRunBuffer);
             }
+            processRunBuffer(safeStore);
+
             if (awaitingEpochBufferCount > 0)
                 rerunWithPendingEpoch();
         }
         catch (Throwable t)
         {
             node.agent().onUncaughtException(t);
         }
+        finally
+        {
+            processing = false;
+        }
     }
 
-    private void addToRunBuffer(TxnState add)
+    private void addToRunBuffer(RunInvoker readyToRun)
     {
         if (runBufferCount == runBuffer.length)
-            runBuffer = cachedAny().resize(runBuffer, runBufferCount, 
Math.max(8, runBuffer.length * 2));
-        runBuffer[runBufferCount++] = add;
+        {
+            int newCount = runBufferCount - runBufferIndex;
+            Object[] newBuffer = cachedAny().get(Math.max(8, newCount * 2));
+            replaceRunBuffer(newBuffer);
+        }
+        runBuffer[runBufferCount++] = readyToRun;
+    }
+
+    private void replaceRunBuffer(Object[] newBuffer)
+    {
+        Object[] prevBuffer = runBuffer;
+        int prevCount = runBufferCount;
+        int newCount = prevCount - runBufferIndex;
+        System.arraycopy(prevBuffer, runBufferIndex, newBuffer, 0, newCount);
+        runBuffer = newBuffer;
+        runBufferIndex = 0;
+        runBufferCount = newCount;
+        if (prevBuffer.length >= ArrayBuffers.MIN_BUFFER_SIZE)
+            cachedAny().forceDiscard(prevBuffer, prevCount);
+    }
+
+    private void maybeShrinkRunBuffer()
+    {
+        if (runBuffer.length >= (runBufferCount - runBufferIndex)/2)

Review Comment:
   I might be misreading, but did you mean we should do `* 2` here?  
   
   run buffer length will always be larger than number of elements in it, so it 
seems like the idea was to shrink when there's 50%+ empty?



##########
accord-core/src/main/java/accord/impl/InMemoryCommandStore.java:
##########
@@ -119,6 +123,62 @@ public abstract class InMemoryCommandStore extends 
CommandStore
     private static final Logger logger = 
LoggerFactory.getLogger(InMemoryCommandStore.class);
     private static final boolean CHECK_DEPENDENCY_INVARIANTS = false;
 
+    public static class Snapshot extends AsyncResults.SettableResult<Snapshot>
+    {
+        private static final AtomicLong nextId = new AtomicLong();
+
+        private final long id = nextId.incrementAndGet();

Review Comment:
   nit: unused. still usable for debug?



##########
accord-core/src/main/java/accord/impl/InMemoryCommandStore.java:
##########
@@ -1128,88 +1197,75 @@ public static void logDependencyGraph(CommandStore 
commandStore, TxnId txnId)
     // redundantBefore, durableBefore, newBootstrapBeganAt, safeToRead, 
rangesForEpoch are
     // not replayed here. It is assumed that persistence on the application 
side will ensure
     // they are brought up to latest values _before_ replay.
-    public void clearForTesting()
+    public void clear()
     {
         Invariants.require(current == null);
         progressLog.clear();
         commands.clear();
         commandsByExecuteAt.clear();
         commandsForKey.clear();
         rangeCommands.clear();
+        listeners.clear();
         unsafeSetRejectBefore(new RejectBefore());
     }
 
-    public Journal.Loader loader()
+    public Journal.Replayer replayer()
     {
-        return new CommandLoader(this);
+        return new CommandReplayer(this);
     }
 
-    private static class CommandLoader extends AbstractLoader
+    private static class CommandReplayer extends AbstractReplayer
     {
         private final InMemoryCommandStore commandStore;
 
-        private CommandLoader(InMemoryCommandStore commandStore)
+        private CommandReplayer(InMemoryCommandStore commandStore)
         {
+            super(commandStore.unsafeGetRedundantBefore());
             this.commandStore = commandStore;
         }
 
-        private PreLoadContext context(Command command, LoadKeys loadKeys)
-        {
-            TxnId txnId = command.txnId();
-            AbstractUnseekableKeys keys = null;
-
-            if (CommandsForKey.manages(txnId))
-                keys = (AbstractUnseekableKeys) 
command.participants().hasTouched();
-            else if (!CommandsForKey.managesExecution(txnId) && 
command.hasBeen(Status.Stable) && !command.hasBeen(Status.Truncated))
-                keys = command.asCommitted().waitingOn.keys;
-
-            if (keys != null)
-                return PreLoadContext.contextFor(txnId, keys, loadKeys, WRITE, 
"Replay");
-
-            return PreLoadContext.contextFor(txnId, "Replay");
-        }
-
-        protected TxnId loadInternal(Command command, SafeCommandStore 
safeStore)
-        {
-            TxnId txnId = command.txnId();
-            Cleanup cleanup = Cleanup.shouldCleanup(FULL, safeStore, command, 
command.participants());
-            if (cleanup != Cleanup.NO)
-                command = Commands.purge(safeStore, command, cleanup);
-
-            safeStore.unsafeGetNoCleanup(txnId).update(safeStore, command);
-            return txnId;
-        }
-
-        private AsyncChain<TxnId> load(Command command)
+        private AsyncChain<Void> apply(Command command)
         {
             return 
AsyncChains.success(commandStore.executeInContext(commandStore,
-                                                                     
context(command, ASYNC),
-                                                                     
(SafeCommandStore safeStore) -> loadInternal(command, safeStore)));
-        }
-
-        private AsyncChain<Void> apply(TxnId txnId)
-        {
-            return 
AsyncChains.success(commandStore.executeInContext(commandStore,
-                                                                     
PreLoadContext.contextFor(txnId, "Replay"),
+                                                                     
PreLoadContext.contextFor(command.txnId(), "Replay"),
                                                                      
(SafeCommandStore safeStore) -> {
-                                                                         
initialiseState(safeStore, txnId);
+                                                                         
initialiseState(safeStore, command.txnId());
                                                                          
return null;
                                                                      }));
         }
 
         @Override
-        public AsyncChain<Void> load(TxnId txnId)
+        public AsyncChain<Void> replay(TxnId txnId)
         {
             // TODO (required): consider this race condition some more:
             //      - can we avoid double-applying?
             //      - is this definitely safe?
+            Command command = null;
             if (commandStore.hasCommand(txnId))
-                return apply(txnId);
+                command = commandStore.commands.get(txnId).value();
 
-            Command command = 
commandStore.journal.loadCommand(commandStore.id, txnId, 
commandStore.unsafeGetRedundantBefore(), commandStore.durableBefore());
             if (command == null)
+            {
+                command = commandStore.journal.loadCommand(commandStore.id, 
txnId, commandStore.unsafeGetRedundantBefore(), commandStore.durableBefore());
+                if (command != null)
+                {
+                    Cleanup cleanup = Cleanup.shouldCleanup(FULL, command, 
commandStore.unsafeGetRedundantBefore(), commandStore.durableBefore());
+                    if (cleanup != Cleanup.NO)
+                        command = Commands.purgeUnsafe(commandStore, command, 
cleanup);
+
+                    // initialise basic state, but don't call safeStore.update 
so we don't initialise listeners etc
+                    GlobalCommand global = 
commandStore.commands.computeIfAbsent(txnId, GlobalCommand::new);
+                    global.value(command);
+                    Timestamp executeAt = command.executeAtIfKnown();
+                    if (executeAt != null)
+                        commandStore.commandsByExecuteAt.put(executeAt, 
global);
+                }
+            }
+
+            if (command == null || !maybeShouldReplay(txnId) || 
!shouldReplay(txnId, command.participants()))

Review Comment:
   Could you explain why we still need to replay even if we do not apply? 
   
   For some redundantBefore states we should be able to fully avoid loading, 
too, right? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org

Reply via email to