This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 90a6f6aaa7e453809eab9604a71f17751f125aa9
Author: David Capwell <dcapw...@apache.org>
AuthorDate: Tue Apr 2 22:18:50 2024 -0700

    When jvm-dtest is shutting down an instance TCM retries block the shutdown 
causing the test to fail
    
    patch by David Capwell; reviewed by Blake Eggleston for CASSANDRA-19514
---
 .../apache/cassandra/concurrent/Shutdownable.java  | 14 ++++++-
 .../cassandra/service/accord/AccordService.java    | 10 +++++
 .../apache/cassandra/tcm/EpochAwareDebounce.java   | 45 ++++++++++++++++------
 .../org/apache/cassandra/tcm/RemoteProcessor.java  |  2 +
 .../cassandra/distributed/impl/Instance.java       |  7 ++++
 5 files changed, 65 insertions(+), 13 deletions(-)

diff --git a/src/java/org/apache/cassandra/concurrent/Shutdownable.java 
b/src/java/org/apache/cassandra/concurrent/Shutdownable.java
index 185875b791..a72253fc87 100644
--- a/src/java/org/apache/cassandra/concurrent/Shutdownable.java
+++ b/src/java/org/apache/cassandra/concurrent/Shutdownable.java
@@ -19,7 +19,9 @@
 package org.apache.cassandra.concurrent;
 
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
+import org.apache.cassandra.utils.ExecutorUtils;
 import org.apache.cassandra.utils.Shared;
 
 import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
@@ -29,6 +31,11 @@ public interface Shutdownable
 {
     boolean isTerminated();
 
+    default boolean isShutdown()
+    {
+        return isTerminated();
+    }
+
     /**
      * Shutdown once any remaining work has completed (however this is defined 
for the implementation).
      */
@@ -42,5 +49,10 @@ public interface Shutdownable
     /**
      * Await termination of this object, i.e. the cessation of all current and 
future work.
      */
-    public boolean awaitTermination(long timeout, TimeUnit units) throws 
InterruptedException;
+    boolean awaitTermination(long timeout, TimeUnit units) throws 
InterruptedException;
+
+    default void shutdownAndWait(long timeout, TimeUnit unit) throws 
InterruptedException, TimeoutException
+    {
+        ExecutorUtils.shutdownAndWait(timeout, unit, this);
+    }
 }
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 9a44da4538..6e136029a0 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -35,7 +35,9 @@ import com.google.common.primitives.Ints;
 import accord.coordinate.TopologyMismatch;
 import accord.impl.CoordinateDurabilityScheduling;
 import org.apache.cassandra.cql3.statements.RequestValidations;
+import org.apache.cassandra.service.StorageService;
 import 
org.apache.cassandra.service.accord.interop.AccordInteropAdapter.AccordInteropFactory;
+import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.ClusterMetadataService;
 import org.apache.cassandra.service.accord.api.*;
 import org.apache.cassandra.utils.*;
@@ -241,6 +243,14 @@ public class AccordService implements IAccordService, 
Shutdownable
         }
         AccordService as = new 
AccordService(AccordTopology.tcmIdToAccord(tcmId));
         as.startup();
+        if (StorageService.instance.isReplacingSameAddress())
+        {
+            // when replacing another node but using the same ip the hostId 
will also match, this causes no TCM transactions
+            // to be committed...
+            // In order to bootup correctly, need to pull in the current epoch
+            ClusterMetadata current = ClusterMetadata.current();
+            as.configurationService().notifyPostCommit(current, current, 
false);
+        }
         instance = as;
     }
 
diff --git a/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java 
b/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java
index f65c03d830..6d404f5a09 100644
--- a/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java
+++ b/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java
@@ -21,7 +21,6 @@ package org.apache.cassandra.tcm;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 
@@ -30,8 +29,8 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.ExecutorFactory;
 import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.concurrent.Shutdownable;
 import org.apache.cassandra.tcm.log.LogState;
-import org.apache.cassandra.utils.ExecutorUtils;
 import org.apache.cassandra.utils.concurrent.AsyncPromise;
 import org.apache.cassandra.utils.concurrent.Future;
 import org.apache.cassandra.utils.concurrent.Promise;
@@ -42,7 +41,7 @@ import org.apache.cassandra.utils.concurrent.Promise;
  * comes in, we create a new future. If a request for a newer epoch comes in, 
we simply
  * swap out the current future reference for a new one which is requesting the 
newer epoch.
  */
-public class EpochAwareDebounce
+public class EpochAwareDebounce<T> implements Shutdownable
 {
     private static final Logger logger = 
LoggerFactory.getLogger(EpochAwareDebounce.class);
     public static final EpochAwareDebounce instance = new EpochAwareDebounce();
@@ -108,7 +107,37 @@ public class EpochAwareDebounce
         }
     }
 
-    private static class EpochAwareAsyncPromise extends 
AsyncPromise<ClusterMetadata>
+    @Override
+    public boolean isTerminated()
+    {
+        return executor.isTerminated();
+    }
+
+    @Override
+    public boolean isShutdown()
+    {
+        return executor.isShutdown();
+    }
+
+    @Override
+    public void shutdown()
+    {
+        executor.shutdown();
+    }
+
+    @Override
+    public Object shutdownNow()
+    {
+        return executor.shutdownNow();
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit units) throws 
InterruptedException
+    {
+        return executor.awaitTermination(timeout, units);
+    }
+
+    private static class EpochAwareAsyncPromise<T> extends AsyncPromise<T>
     {
         private final Epoch epoch;
         public EpochAwareAsyncPromise(Epoch epoch)
@@ -116,12 +145,4 @@ public class EpochAwareDebounce
             this.epoch = epoch;
         }
     }
-
-    public void shutdownAndWait(long timeout, TimeUnit unit) throws 
InterruptedException, TimeoutException
-    {
-        logger.info("Cancelling {} in flight log fetch requests", 
inflightRequests.size());
-        for (Promise<LogState> toCancel : inflightRequests)
-            toCancel.cancel(true);
-        ExecutorUtils.shutdownAndWait(timeout, unit, executor);
-    }
 }
diff --git a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java 
b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
index b849406e4f..ec3943d5e9 100644
--- a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
@@ -206,6 +206,8 @@ public final class RemoteProcessor implements Processor
             {
                 if (promise.isCancelled() || promise.isDone())
                     return;
+                if (EpochAwareDebounce.instance.isShutdown())
+                    promise.tryFailure(new IllegalStateException("Unable to 
retry as we are shutting down"));
                 if (!candidates.hasNext())
                     promise.tryFailure(new 
IllegalStateException(String.format("Ran out of candidates while sending %s: 
%s", verb, candidates)));
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java 
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index d7542f5f0c..8a508b03a5 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -898,6 +898,7 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
     {
         Future<?> future = async((ExecutorService executor) -> {
             Throwable error = null;
+            inInstancelogger.warn("Shutting down in thread {}", 
Thread.currentThread().getName());
 
             CompactionManager.instance.forceShutdown();
 
@@ -1238,6 +1239,11 @@ public class Instance extends IsolatedExecutor 
implements IInvokableInstance
                 }
             }));
         }
+        // This is not used code, but it is here for when you run in a 
debugger...
+        // When shutdown gets blocked we need to be able to trace down which 
future is blocked, so this idx
+        // helps map the location... the reason we can't leverage here is the 
timeout logic is higher up, so
+        // 'idx' really only helps out in a debugger...
+        int idx = 0;
         for (Future<Throwable> future : results)
         {
             try
@@ -1250,6 +1256,7 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
             {
                 accumulate = Throwables.merge(accumulate, t);
             }
+            idx++;
         }
         return accumulate;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to