ifesdjeen commented on code in PR #4230:
URL: https://github.com/apache/cassandra/pull/4230#discussion_r2190747505


##########
src/java/org/apache/cassandra/service/accord/AccordExecutor.java:
##########
@@ -224,6 +229,38 @@ public Stream<? extends DebuggableTaskRunner> active()
         return Stream.of();
     }
 
+    public void waitForQuiescence()
+    {
+        Condition condition;
+        lock.lock();
+        try
+        {
+            if (tasks == 0)
+            {
+                Invariants.require(running == 0);
+                return;
+            }
+            if (waitingForQuiescence == null)
+                waitingForQuiescence = new ArrayList<>();
+            condition = Condition.newOneTimeCondition();
+            waitingForQuiescence.add(condition);
+        }
+        finally
+        {
+            lock.unlock();
+        }
+        condition.awaitThrowUncheckedOnInterrupt();
+    }
+
+    protected void signalQuiescent()
+    {
+        if (waitingForQuiescence != null)

Review Comment:
   Could you assert or document that this should only be called behind a lock?



##########
src/java/org/apache/cassandra/service/accord/AccordJournal.java:
##########
@@ -503,19 +510,26 @@ public void replay(CommandStores commandStores)
                                        ref.key().id.compareTo(prev.id) != 0,
                                        "duplicate key detected %s == %s", 
ref.key(), prev);
                     prev = ref.key();
-                    AsyncChains.getUnchecked(loader.load(txnId)
-                                                   .map(command -> {
-                                                       if 
(journalTable.shouldIndex(ref.key())
-                                                           && 
command.participants() != null
-                                                           && 
command.participants().route() != null)
-                                                       {
-                                                           
ref.segments(segment -> {
-                                                               
journalTable.safeNotify(index -> index.update(segment, 
ref.key().commandStoreId, txnId, command.participants().route()));
-                                                           });
-                                                       }
-                                                       return command;
-                                                   })
-                                                   .beginAsResult());
+
+                    Throwable rethrow = failures.poll();
+                    if (rethrow != null)

Review Comment:
   I think this will break unsafe replay, which allows us to replay even when a 
particular command load fails. This is handled by `journal.handleError`.



##########
src/java/org/apache/cassandra/service/accord/AccordJournal.java:
##########
@@ -503,19 +510,26 @@ public void replay(CommandStores commandStores)
                                        ref.key().id.compareTo(prev.id) != 0,
                                        "duplicate key detected %s == %s", 
ref.key(), prev);
                     prev = ref.key();
-                    AsyncChains.getUnchecked(loader.load(txnId)
-                                                   .map(command -> {
-                                                       if 
(journalTable.shouldIndex(ref.key())
-                                                           && 
command.participants() != null
-                                                           && 
command.participants().route() != null)
-                                                       {
-                                                           
ref.segments(segment -> {
-                                                               
journalTable.safeNotify(index -> index.update(segment, 
ref.key().commandStoreId, txnId, command.participants().route()));
-                                                           });
-                                                       }
-                                                       return command;
-                                                   })
-                                                   .beginAsResult());
+
+                    Throwable rethrow = failures.poll();
+                    if (rethrow != null)
+                        throw rethrow;
+
+                    concurrency.acquireThrowUncheckedOnInterrupt(1);
+                    loader.load(txnId)
+                          .map(route -> {

Review Comment:
   I had some positive results from releasing before indexing, not sure if you 
would like to consider this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org

Reply via email to