ifesdjeen commented on code in PR #4247:
URL: https://github.com/apache/cassandra/pull/4247#discussion_r2205865505


##########
src/java/org/apache/cassandra/service/accord/AccordJournal.java:
##########
@@ -492,54 +504,202 @@ public void forEach(Consumer<JournalKey> consumer)
     @Override
     public void replay(CommandStores commandStores)
     {
-        final Semaphore concurrency = 
Semaphore.newSemaphore(FBUtilities.getAvailableProcessors());
+        // TODO (expected): make the parallelisms configurable
+        final Semaphore commandParallelism = 
Semaphore.newSemaphore(getAvailableProcessors());
+        final int commandStoreParallelism = Math.max(Math.max(1, 
Math.min(getAvailableProcessors(), 4)), getAvailableProcessors() / 4);
         final AtomicBoolean abort = new AtomicBoolean();
+        // TODO (expected): balance work submission by AccordExecutor
+        final IntArrayList activeCommandStoreIds = new IntArrayList();
+        final ReplayQueue pendingCommandStores = new 
ReplayQueue(commandStores.all());
 
-        try (CloseableIterator<Journal.KeyRefs<JournalKey>> iter = 
journalTable.keyIterator())
+        class ReplayStream implements Closeable
         {
-            JournalKey prev = null;
-            while (iter.hasNext())
+            final CommandStore commandStore;
+            final Loader loader;
+            final CloseableIterator<Journal.KeyRefs<JournalKey>> iter;
+            JournalKey prev;
+
+            public ReplayStream(CommandStore commandStore)
             {
-                if (abort.get())
-                    break;
+                this.commandStore = commandStore;
+                this.loader = commandStore.loader();
+                this.iter = journalTable.keyIterator(new 
JournalKey(TxnId.NONE, COMMAND_DIFF, commandStore.id()), new 
JournalKey(TxnId.MAX.withoutNonIdentityFlags(), COMMAND_DIFF, 
commandStore.id()));
+            }
 
+            boolean replay()
+            {
                 JournalKey key;
                 long[] segments;
+                while (true)
                 {
+                    if (!iter.hasNext())
+                    {
+                        logger.info("Completed replay of {}", commandStore);
+                        return false;
+                    }
+
                     Journal.KeyRefs<JournalKey> ref = iter.next();
-                    key = ref.key();
-                    if (key.type != JournalKey.Type.COMMAND_DIFF)
+                    if (ref.key().type != COMMAND_DIFF)
                         continue;
 
+                    key = ref.key();
                     segments = journalTable.shouldIndex(key) ? 
ref.copyOfSegments() : null;
+                    break;
                 }
 
-                CommandStore commandStore = 
commandStores.forId(key.commandStoreId);
-                AccordCommandStoreLoader loader = (AccordCommandStoreLoader) 
commandStore.loader();
 
                 TxnId txnId = key.id;
                 Invariants.require(prev == null ||
                                    key.commandStoreId != prev.commandStoreId ||
                                    key.id.compareTo(prev.id) != 0,
                                    "duplicate key detected %s == %s", key, 
prev);
                 prev = key;
-
-                concurrency.acquireThrowUncheckedOnInterrupt(1);
+                commandParallelism.acquireThrowUncheckedOnInterrupt(1);
                 loader.load(txnId)
                       .map(route -> {
-                          if (segments != null)
+                          if (segments != null && route != null)
                           {
                               for (long segment : segments)
-                                  journalTable.safeNotify(index -> 
index.update(segment, key.commandStoreId, txnId, route));
+                                  journalTable.safeNotify(index -> 
index.update(segment, key.commandStoreId, txnId, (Route)route));
                           }
                           return null;
                       }).begin((success, fail) -> {
-                          concurrency.release(1);
+                          commandParallelism.release(1);
                           if (fail != null && !journal.handleError("Could not 
replay command " + txnId, fail))
                               abort.set(true);
                       });
+
+                return true;
+            }
+
+            @Override
+            public void close()
+            {
+                iter.close();
+            }
+        }
+
+        final Int2ObjectHashMap<ReplayStream> replayStreams = new 
Int2ObjectHashMap<>();
+        try
+        {
+            int cur = 0;
+            while (true)
+            {
+                if (abort.get())
+                    break;
+
+                if (cur == activeCommandStoreIds.size())
+                {
+                    if (activeCommandStoreIds.size() < commandStoreParallelism 
&& !pendingCommandStores.isEmpty())
+                    {
+                        CommandStore next = pendingCommandStores.next();
+                        int id = next.id();
+                        activeCommandStoreIds.add(id);
+                        replayStreams.put(id, new ReplayStream(next));

Review Comment:
   Talked on slack: let's leave it as-is, since we can't pick up segments for 
_already active_ journals anyways, and other ones won't generate new work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org

Reply via email to