krummas commented on code in PR #2660:
URL: https://github.com/apache/cassandra/pull/2660#discussion_r1322427047


##########
conf/cassandra.yaml:
##########
@@ -723,6 +723,24 @@ memtable_allocation_type: heap_buffers
 # Min unit: MiB
 # repair_session_space:
 
+# Configure the retries for each of the repair messages that support it.
+#
+# For more details see https://issues.apache.org/jira/browse/CASSANDRA-18816
+#
+# repair:
+#   retries:
+#     type: Exponential
+#     max_attempts: 3
+#     base_sleep_time: 200ms
+#     max_sleep_time: 1s
+#     verbs:
+#       # Increase the timeout of validation responses due to them containing 
the merkle tree
+#       VALIDATION_RSP:

Review Comment:
   don't think we should expose verb names in the config



##########
src/java/org/apache/cassandra/config/DatabaseDescriptor.java:
##########
@@ -1899,6 +1899,11 @@ public static IFailureDetector newFailureDetector()
         return newFailureDetector.get();
     }
 
+    public static void setFailureDetector(Supplier<IFailureDetector> fn)

Review Comment:
   why do we need this when we have failureDetector() in SharedContext?



##########
src/java/org/apache/cassandra/config/Properties.java:
##########
@@ -68,6 +71,35 @@ public static Property andThen(Property root, Property leaf)
         return andThen(root, leaf, DELIMITER);
     }
 
+    @Nullable
+    public static Property andThenMap(Property root, String key, String 
delimiter)

Review Comment:
   I think we can remove all this if we simplify the config
   
   if we do need to keep this, we really need some docs on what this method does



##########
src/java/org/apache/cassandra/config/DatabaseDescriptor.java:
##########
@@ -4906,4 +4917,9 @@ public static DataStorageSpec.IntMebibytesBound 
getSAISegmentWriteBufferSpace()
     {
         return conf.sai_options.segment_write_buffer_size;
     }
+
+    public static RepairRetrySpec getRepairRetrys()

Review Comment:
   `getRepairRetries`? `getRepairRetrySpec`?



##########
src/java/org/apache/cassandra/service/ActiveRepairService.java:
##########
@@ -213,25 +224,31 @@ public static ExecutorPlus repairCommandExecutor()
         return RepairCommandExecutorHandle.repairCommandExecutor;
     }
 
-    private final IFailureDetector failureDetector;
-    private final Gossiper gossiper;
     private final Cache<Integer, Pair<ParentRepairStatus, List<String>>> 
repairStatusByCmd;
+    public final ExecutorPlus snapshotExecutor;
 
-    public final ExecutorPlus snapshotExecutor = 
executorFactory().configurePooled("RepairSnapshotExecutor", 1)
-                                                                  
.withKeepAlive(1, TimeUnit.HOURS)
-                                                                  .build();
+    public ActiveRepairService()
+    {
+        this(SharedContext.Global.instance, 
CompactionManager.instance::getPendingTasks);
+    }
 
-    public ActiveRepairService(IFailureDetector failureDetector, Gossiper 
gossiper)
+    @VisibleForTesting
+    public ActiveRepairService(SharedContext ctx,
+                               IntSupplier pendingTasks)

Review Comment:
   maybe move `CompactionManager` in to `SharedContext`?



##########
src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java:
##########
@@ -172,14 +194,23 @@ public void doVerb(final Message<RepairMessage> message)
                         ColumnFamilyStore store = 
ColumnFamilyStore.getIfExists(desc.keyspace, desc.columnFamily);
                         if (store == null)
                         {
-                            logger.error("Table {}.{} was dropped during 
validation phase of repair {}",
-                                         desc.keyspace, desc.columnFamily, 
desc.parentSessionId);
-                            vState.phase.fail(String.format("Table %s.%s was 
dropped", desc.keyspace, desc.columnFamily));
-                            
MessagingService.instance().send(Message.out(VALIDATION_RSP, new 
ValidationResponse(desc)), message.from());
+                            String msg = String.format("Table %s.% was dropped 
during validation phase of repair %s", desc.keyspace, desc.columnFamily, 
desc.parentSessionId);
+                            vState.phase.fail(msg);
+                            logErrorAndSendFailureResponse(msg, message);

Review Comment:
   do we need to send both the failure and the empty `VALIDATION_RSP`? Looks 
like we call `tryFailure` on sending side in both cases



##########
src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java:
##########
@@ -172,14 +194,23 @@ public void doVerb(final Message<RepairMessage> message)
                         ColumnFamilyStore store = 
ColumnFamilyStore.getIfExists(desc.keyspace, desc.columnFamily);
                         if (store == null)
                         {
-                            logger.error("Table {}.{} was dropped during 
validation phase of repair {}",
-                                         desc.keyspace, desc.columnFamily, 
desc.parentSessionId);
-                            vState.phase.fail(String.format("Table %s.%s was 
dropped", desc.keyspace, desc.columnFamily));
-                            
MessagingService.instance().send(Message.out(VALIDATION_RSP, new 
ValidationResponse(desc)), message.from());
+                            String msg = String.format("Table %s.% was dropped 
during validation phase of repair %s", desc.keyspace, desc.columnFamily, 
desc.parentSessionId);
+                            vState.phase.fail(msg);
+                            logErrorAndSendFailureResponse(msg, message);
+                            ctx.messaging().send(Message.out(VALIDATION_RSP, 
new ValidationResponse(desc)), message.from());
                             return;
                         }
 
-                        
ActiveRepairService.instance.consistent.local.maybeSetRepairing(desc.parentSessionId);
+                        try
+                        {
+                            
ctx.repair().consistent.local.maybeSetRepairing(desc.parentSessionId);
+                        }
+                        catch (Throwable t)
+                        {
+                            JVMStabilityInspector.inspectThrowable(t);
+                            logErrorAndSendFailureResponse(t.toString(), 
message);
+                            ctx.messaging().send(Message.out(VALIDATION_RSP, 
new ValidationResponse(desc)), message.from());

Review Comment:
   we should return here



##########
src/java/org/apache/cassandra/repair/messages/SyncRequest.java:
##########
@@ -69,6 +76,22 @@ public SyncRequest(RepairJobDesc desc,
         this.asymmetric = asymmetric;
     }
 
+    public UUID deterministicId()

Review Comment:
   maybe we should add a sync request id instead of doing this
   
   It would change serialization, but we don't support repair between major 
versions anyway



##########
src/java/org/apache/cassandra/repair/messages/RepairMessage.java:
##########
@@ -42,57 +54,210 @@
  */
 public abstract class RepairMessage
 {
-    private static final CassandraVersion SUPPORTS_TIMEOUTS = new 
CassandraVersion("4.0.7-SNAPSHOT");
+    private enum ErrorHandling { NONE, TIMEOUT, RETRY }
+    private static final CassandraVersion SUPPORTS_RETRY = new 
CassandraVersion("5.1.0-SNAPSHOT");
+    private static final Map<Verb, CassandraVersion> VERB_TIMEOUT_VERSIONS;
+
+    static
+    {
+        CassandraVersion timeoutVersion = new 
CassandraVersion("4.0.7-SNAPSHOT");
+        EnumMap<Verb, CassandraVersion> map = new EnumMap<>(Verb.class);
+        map.put(Verb.VALIDATION_REQ, timeoutVersion);
+        map.put(Verb.SYNC_REQ, timeoutVersion);
+        map.put(Verb.VALIDATION_RSP, SUPPORTS_RETRY);
+        map.put(Verb.SYNC_RSP, SUPPORTS_RETRY);
+        VERB_TIMEOUT_VERSIONS = Collections.unmodifiableMap(map);
+    }
+    private static final Set<Verb> SUPPORTS_RETRY_WITHOUT_VERSION_CHECK = 
Collections.unmodifiableSet(EnumSet.of(Verb.CLEANUP_MSG));
+
     private static final Logger logger = 
LoggerFactory.getLogger(RepairMessage.class);
+    @Nullable
     public final RepairJobDesc desc;
 
-    protected RepairMessage(RepairJobDesc desc)
+    protected RepairMessage(@Nullable RepairJobDesc desc)
     {
         this.desc = desc;
     }
 
+    public TimeUUID parentRepairSession()
+    {
+        return desc.parentSessionId;
+    }
+
     public interface RepairFailureCallback
     {
         void onFailure(Exception e);
     }
 
-    public static void sendMessageWithFailureCB(RepairMessage request, Verb 
verb, InetAddressAndPort endpoint, RepairFailureCallback failureCallback)
+    private static Backoff backoff(SharedContext ctx, Verb verb)
     {
-        RequestCallback<?> callback = new RequestCallback<Object>()
+        RepairRetrySpec.Verb configVerb = toConfigVerb(verb);
+        if (configVerb == null)
+            return Backoff.None.INSTANCE;
+        RepairRetrySpec retrySpec = DatabaseDescriptor.getRepairRetrys();
+        RetrySpec spec = retrySpec.get(configVerb);
+        if (!spec.isEnabled())
+            return Backoff.None.INSTANCE;
+        switch (spec.type)
+        {
+            case Exponential:
+                return new Backoff.ExponentialBackoff(spec.maxAttempts.value, 
spec.baseSleepTime.toMilliseconds(), spec.maxSleepTime.toMilliseconds(), 
ctx.random().get()::nextDouble);
+            default:
+                throw new IllegalArgumentException("Unknown type: " + 
spec.type);
+        }
+    }
+
+    @Nullable
+    private static RepairRetrySpec.Verb toConfigVerb(Verb verb)
+    {
+        switch (verb)
+        {
+            case PREPARE_MSG: return RepairRetrySpec.Verb.PREPARE;
+            case VALIDATION_REQ: return RepairRetrySpec.Verb.VALIDATION_REQ;
+            case VALIDATION_RSP: return RepairRetrySpec.Verb.VALIDATION_RSP;
+            case SYNC_REQ: return RepairRetrySpec.Verb.SYNC_REQ;
+            case SYNC_RSP: return RepairRetrySpec.Verb.SYNC_RSP;
+            case SNAPSHOT_MSG: return RepairRetrySpec.Verb.SNAPSHOT;
+            case CLEANUP_MSG: return RepairRetrySpec.Verb.CLEANUP;
+            default: return null;
+        }
+    }
+
+    public interface AllowRetry
+    {
+        boolean test(InetAddressAndPort from, RequestFailureReason 
failureReason, int attempt);
+    }
+
+    public static AllowRetry notDone(Future<?> f)
+    {
+        return (i1, i2, i3) -> !f.isDone();
+    }
+
+    private static AllowRetry always()
+    {
+        return (i1, i2, i3) -> true;
+    }
+
+    public static <T> void sendMessageWithRetries(SharedContext ctx, 
AllowRetry allowRetry, RepairMessage request, Verb verb, InetAddressAndPort 
endpoint, RequestCallback<T> finalCallback)
+    {
+        sendMessageWithRetries(ctx, backoff(ctx, verb), allowRetry, request, 
verb, endpoint, finalCallback, 0);
+    }
+
+    public static <T> void sendMessageWithRetries(SharedContext ctx, 
RepairMessage request, Verb verb, InetAddressAndPort endpoint, 
RequestCallback<T> finalCallback)
+    {
+        sendMessageWithRetries(ctx, backoff(ctx, verb), always(), request, 
verb, endpoint, finalCallback, 0);
+    }
+
+    public static <T> void sendMessageWithRetries(SharedContext ctx, 
RepairMessage request, Verb verb, InetAddressAndPort endpoint)

Review Comment:
   `<T>` is unused



##########
src/java/org/apache/cassandra/service/ActiveRepairService.java:
##########
@@ -678,22 +670,60 @@ public boolean invokeOnFailure()
                 }
             }
         }
-        try
+        // implement timeout to bound the runtime of the future
+        long timeoutMillis = 
getRepairRetrys().isEnabled(RepairRetrySpec.Verb.PREPARE) ? 
getRepairRpcTimeout(MILLISECONDS)
+                                                                               
        : getRpcTimeout(MILLISECONDS);
+        ctx.optionalTasks().schedule(() -> {
+            if (promise.isDone())
+                return;
+            String errorMsg = "Did not get replies from all endpoints.";
+            if (promise.tryFailure(new RuntimeException(errorMsg)))
+                participateFailed(parentRepairSession, errorMsg);
+        }, timeoutMillis, MILLISECONDS);
+
+        return promise;
+    }
+
+    private void sendPrepareWithRetries(TimeUUID parentRepairSession,
+                                        AtomicInteger pending,
+                                        Set<String> failedNodes,
+                                        AsyncPromise<Void> promise,
+                                        InetAddressAndPort to,
+                                        RepairMessage msg)
+    {
+        RepairMessage.sendMessageWithRetries(ctx, notDone(promise), msg, 
PREPARE_MSG, to, new RequestCallback<>()
         {
-            if (!prepareLatch.await(getRpcTimeout(MILLISECONDS), MILLISECONDS) 
|| timeouts.get() > 0)
-                failRepair(parentRepairSession, "Did not get replies from all 
endpoints.");
-        }
-        catch (InterruptedException e)
-        {
-            failRepair(parentRepairSession, "Interrupted while waiting for 
prepare repair response.");
-        }
+            @Override
+            public void onResponse(Message<Object> msg)
+            {
+                ack();
+            }
 
-        if (!status.get())
-        {
-            failRepair(parentRepairSession, "Got negative replies from 
endpoints " + failedNodes);
-        }
+            @Override
+            public void onFailure(InetAddressAndPort from, 
RequestFailureReason failureReason)
+            {
+                failedNodes.add(from.toString());
+                if (failureReason == RequestFailureReason.TIMEOUT)
+                {
+                    pending.set(-1);
+                    
promise.setFailure(failRepairException(parentRepairSession, "Did not get 
replies from all endpoints."));
+                }
+                else
+                {
+                    ack();
+                }
+            }
+
+            private void ack()
+            {
+                if (pending.decrementAndGet() == 0)
+                {
+                    if (failedNodes.isEmpty()) promise.setSuccess(null);

Review Comment:
   code style



##########
src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java:
##########
@@ -172,14 +194,23 @@ public void doVerb(final Message<RepairMessage> message)
                         ColumnFamilyStore store = 
ColumnFamilyStore.getIfExists(desc.keyspace, desc.columnFamily);
                         if (store == null)
                         {
-                            logger.error("Table {}.{} was dropped during 
validation phase of repair {}",
-                                         desc.keyspace, desc.columnFamily, 
desc.parentSessionId);
-                            vState.phase.fail(String.format("Table %s.%s was 
dropped", desc.keyspace, desc.columnFamily));
-                            
MessagingService.instance().send(Message.out(VALIDATION_RSP, new 
ValidationResponse(desc)), message.from());
+                            String msg = String.format("Table %s.% was dropped 
during validation phase of repair %s", desc.keyspace, desc.columnFamily, 
desc.parentSessionId);
+                            vState.phase.fail(msg);
+                            logErrorAndSendFailureResponse(msg, message);

Review Comment:
   should we send these `VALIDATION_RSP` messages with retries as well?



##########
src/java/org/apache/cassandra/repair/messages/RepairMessage.java:
##########
@@ -42,57 +54,210 @@
  */
 public abstract class RepairMessage
 {
-    private static final CassandraVersion SUPPORTS_TIMEOUTS = new 
CassandraVersion("4.0.7-SNAPSHOT");
+    private enum ErrorHandling { NONE, TIMEOUT, RETRY }
+    private static final CassandraVersion SUPPORTS_RETRY = new 
CassandraVersion("5.1.0-SNAPSHOT");
+    private static final Map<Verb, CassandraVersion> VERB_TIMEOUT_VERSIONS;
+
+    static
+    {
+        CassandraVersion timeoutVersion = new 
CassandraVersion("4.0.7-SNAPSHOT");
+        EnumMap<Verb, CassandraVersion> map = new EnumMap<>(Verb.class);
+        map.put(Verb.VALIDATION_REQ, timeoutVersion);
+        map.put(Verb.SYNC_REQ, timeoutVersion);
+        map.put(Verb.VALIDATION_RSP, SUPPORTS_RETRY);
+        map.put(Verb.SYNC_RSP, SUPPORTS_RETRY);
+        VERB_TIMEOUT_VERSIONS = Collections.unmodifiableMap(map);
+    }
+    private static final Set<Verb> SUPPORTS_RETRY_WITHOUT_VERSION_CHECK = 
Collections.unmodifiableSet(EnumSet.of(Verb.CLEANUP_MSG));
+
     private static final Logger logger = 
LoggerFactory.getLogger(RepairMessage.class);
+    @Nullable
     public final RepairJobDesc desc;
 
-    protected RepairMessage(RepairJobDesc desc)
+    protected RepairMessage(@Nullable RepairJobDesc desc)
     {
         this.desc = desc;
     }
 
+    public TimeUUID parentRepairSession()
+    {
+        return desc.parentSessionId;
+    }
+
     public interface RepairFailureCallback
     {
         void onFailure(Exception e);
     }
 
-    public static void sendMessageWithFailureCB(RepairMessage request, Verb 
verb, InetAddressAndPort endpoint, RepairFailureCallback failureCallback)
+    private static Backoff backoff(SharedContext ctx, Verb verb)
     {
-        RequestCallback<?> callback = new RequestCallback<Object>()
+        RepairRetrySpec.Verb configVerb = toConfigVerb(verb);
+        if (configVerb == null)
+            return Backoff.None.INSTANCE;
+        RepairRetrySpec retrySpec = DatabaseDescriptor.getRepairRetrys();
+        RetrySpec spec = retrySpec.get(configVerb);
+        if (!spec.isEnabled())
+            return Backoff.None.INSTANCE;
+        switch (spec.type)
+        {
+            case Exponential:
+                return new Backoff.ExponentialBackoff(spec.maxAttempts.value, 
spec.baseSleepTime.toMilliseconds(), spec.maxSleepTime.toMilliseconds(), 
ctx.random().get()::nextDouble);
+            default:
+                throw new IllegalArgumentException("Unknown type: " + 
spec.type);
+        }
+    }
+
+    @Nullable
+    private static RepairRetrySpec.Verb toConfigVerb(Verb verb)
+    {
+        switch (verb)
+        {
+            case PREPARE_MSG: return RepairRetrySpec.Verb.PREPARE;
+            case VALIDATION_REQ: return RepairRetrySpec.Verb.VALIDATION_REQ;
+            case VALIDATION_RSP: return RepairRetrySpec.Verb.VALIDATION_RSP;
+            case SYNC_REQ: return RepairRetrySpec.Verb.SYNC_REQ;
+            case SYNC_RSP: return RepairRetrySpec.Verb.SYNC_RSP;
+            case SNAPSHOT_MSG: return RepairRetrySpec.Verb.SNAPSHOT;
+            case CLEANUP_MSG: return RepairRetrySpec.Verb.CLEANUP;
+            default: return null;
+        }
+    }
+
+    public interface AllowRetry
+    {
+        boolean test(InetAddressAndPort from, RequestFailureReason 
failureReason, int attempt);

Review Comment:
   these parameters are not used anywhere



##########
src/java/org/apache/cassandra/config/RetrySpec.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.config;
+
+import java.util.Objects;
+
+import javax.annotation.Nullable;
+
+import org.apache.cassandra.config.DurationSpec.LongMillisecondsBound;
+
+public class RetrySpec
+{
+    public enum Type { Exponential }

Review Comment:
   only one Type, we should not abstract this until we actually need to.
   
   I understand that the end goal might be to have more than one type, but to 
simplify this patch we should not add abstractions we don't use. If we want to 
refactor all different kinds of retries we have in the db in the future, that 
should be done in a separate patch



##########
src/java/org/apache/cassandra/db/repair/CassandraTableRepairManager.java:
##########
@@ -41,16 +42,24 @@
 public class CassandraTableRepairManager implements TableRepairManager
 {
     private final ColumnFamilyStore cfs;
+    private final SharedContext ctx;
 
     public CassandraTableRepairManager(ColumnFamilyStore cfs)
     {
         this.cfs = cfs;
+        this.ctx = SharedContext.Global.instance;

Review Comment:
   `this(cfs, SharedContext.Global.instance);`



##########
src/java/org/apache/cassandra/config/RepairRetrySpec.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.config;
+
+import java.util.EnumMap;
+import java.util.Map;
+
+public class RepairRetrySpec extends RetrySpec

Review Comment:
   We only use this for repair, can remove this abstraction?



##########
src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java:
##########
@@ -172,14 +194,23 @@ public void doVerb(final Message<RepairMessage> message)
                         ColumnFamilyStore store = 
ColumnFamilyStore.getIfExists(desc.keyspace, desc.columnFamily);
                         if (store == null)
                         {
-                            logger.error("Table {}.{} was dropped during 
validation phase of repair {}",
-                                         desc.keyspace, desc.columnFamily, 
desc.parentSessionId);
-                            vState.phase.fail(String.format("Table %s.%s was 
dropped", desc.keyspace, desc.columnFamily));
-                            
MessagingService.instance().send(Message.out(VALIDATION_RSP, new 
ValidationResponse(desc)), message.from());
+                            String msg = String.format("Table %s.% was dropped 
during validation phase of repair %s", desc.keyspace, desc.columnFamily, 
desc.parentSessionId);

Review Comment:
   `%s.%` should be `%s.%s`



##########
conf/cassandra.yaml:
##########
@@ -723,6 +723,24 @@ memtable_allocation_type: heap_buffers
 # Min unit: MiB
 # repair_session_space:
 
+# Configure the retries for each of the repair messages that support it.
+#
+# For more details see https://issues.apache.org/jira/browse/CASSANDRA-18816
+#
+# repair:
+#   retries:
+#     type: Exponential
+#     max_attempts: 3
+#     base_sleep_time: 200ms
+#     max_sleep_time: 1s
+#     verbs:
+#       # Increase the timeout of validation responses due to them containing 
the merkle tree
+#       VALIDATION_RSP:

Review Comment:
   In general I doubt we need to be able to override attempts/sleep times etc 
on a per-message basis - I think we can retry equally for each message, with 
the exception for VALIDATION_RSP which obviously might need more time.
   
   so basically something like (not sure what is more obvious to users, 
`merkle_tree_...` or `validation_...`):
   ```
   repair:
      max_attempts: 3
      base_sleep_time: 200ms
      max_sleep_time: 1s
      merkle_tree_response_max_attempts: 3
      ...
   ```



##########
src/java/org/apache/cassandra/service/ActiveRepairService.java:
##########
@@ -150,28 +144,43 @@
 @Simulate(with = MONITORS)
 public class ActiveRepairService implements IEndpointStateChangeSubscriber, 
IFailureDetectionEventListener, ActiveRepairServiceMBean
 {
+
     public enum ParentRepairStatus
     {
         IN_PROGRESS, COMPLETED, FAILED
     }
 
-    public static class ConsistentSessions
+    public class ConsistentSessions

Review Comment:
   this can still be static



##########
src/java/org/apache/cassandra/service/ActiveRepairService.java:
##########
@@ -614,62 +632,36 @@ public static boolean 
verifyCompactionsPendingThreshold(TimeUUID parentRepairSes
         return true;
     }
 
-    public TimeUUID prepareForRepair(TimeUUID parentRepairSession, 
InetAddressAndPort coordinator, Set<InetAddressAndPort> endpoints, RepairOption 
options, boolean isForcedRepair, List<ColumnFamilyStore> columnFamilyStores)
+    public Future<?> prepareForRepair(TimeUUID parentRepairSession, 
InetAddressAndPort coordinator, Set<InetAddressAndPort> endpoints, RepairOption 
options, boolean isForcedRepair, List<ColumnFamilyStore> columnFamilyStores)
     {
         if (!verifyCompactionsPendingThreshold(parentRepairSession, 
options.getPreviewKind()))
             failRepair(parentRepairSession, "Rejecting incoming repair, 
pending compactions above threshold"); // failRepair throws exception
 
         long repairedAt = getRepairedAt(options, isForcedRepair);
         registerParentRepairSession(parentRepairSession, coordinator, 
columnFamilyStores, options.getRanges(), options.isIncremental(), repairedAt, 
options.isGlobal(), options.getPreviewKind());
-        final CountDownLatch prepareLatch = 
newCountDownLatch(endpoints.size());
-        final AtomicBoolean status = new AtomicBoolean(true);
-        final Set<String> failedNodes = synchronizedSet(new HashSet<String>());
-        final AtomicInteger timeouts = new AtomicInteger(0);
-        RequestCallback callback = new RequestCallback()
-        {
-            @Override
-            public void onResponse(Message msg)
-            {
-                prepareLatch.decrement();
-            }
-
-            @Override
-            public void onFailure(InetAddressAndPort from, 
RequestFailureReason failureReason)
-            {
-                status.set(false);
-                failedNodes.add(from.toString());
-                if (failureReason == RequestFailureReason.TIMEOUT)
-                    timeouts.incrementAndGet();
-                prepareLatch.decrement();
-            }
-
-            @Override
-            public boolean invokeOnFailure()
-            {
-                return true;
-            }
-        };
+        final AtomicInteger pending = new AtomicInteger(endpoints.size());

Review Comment:
   we can drop the `final` keyword on these



##########
src/java/org/apache/cassandra/repair/messages/RepairMessage.java:
##########
@@ -42,57 +54,210 @@
  */
 public abstract class RepairMessage
 {
-    private static final CassandraVersion SUPPORTS_TIMEOUTS = new 
CassandraVersion("4.0.7-SNAPSHOT");
+    private enum ErrorHandling { NONE, TIMEOUT, RETRY }
+    private static final CassandraVersion SUPPORTS_RETRY = new 
CassandraVersion("5.1.0-SNAPSHOT");
+    private static final Map<Verb, CassandraVersion> VERB_TIMEOUT_VERSIONS;
+
+    static
+    {
+        CassandraVersion timeoutVersion = new 
CassandraVersion("4.0.7-SNAPSHOT");
+        EnumMap<Verb, CassandraVersion> map = new EnumMap<>(Verb.class);
+        map.put(Verb.VALIDATION_REQ, timeoutVersion);
+        map.put(Verb.SYNC_REQ, timeoutVersion);
+        map.put(Verb.VALIDATION_RSP, SUPPORTS_RETRY);
+        map.put(Verb.SYNC_RSP, SUPPORTS_RETRY);
+        VERB_TIMEOUT_VERSIONS = Collections.unmodifiableMap(map);
+    }
+    private static final Set<Verb> SUPPORTS_RETRY_WITHOUT_VERSION_CHECK = 
Collections.unmodifiableSet(EnumSet.of(Verb.CLEANUP_MSG));
+
     private static final Logger logger = 
LoggerFactory.getLogger(RepairMessage.class);
+    @Nullable
     public final RepairJobDesc desc;
 
-    protected RepairMessage(RepairJobDesc desc)
+    protected RepairMessage(@Nullable RepairJobDesc desc)
     {
         this.desc = desc;
     }
 
+    public TimeUUID parentRepairSession()
+    {
+        return desc.parentSessionId;
+    }
+
     public interface RepairFailureCallback
     {
         void onFailure(Exception e);
     }
 
-    public static void sendMessageWithFailureCB(RepairMessage request, Verb 
verb, InetAddressAndPort endpoint, RepairFailureCallback failureCallback)
+    private static Backoff backoff(SharedContext ctx, Verb verb)
     {
-        RequestCallback<?> callback = new RequestCallback<Object>()
+        RepairRetrySpec.Verb configVerb = toConfigVerb(verb);
+        if (configVerb == null)
+            return Backoff.None.INSTANCE;
+        RepairRetrySpec retrySpec = DatabaseDescriptor.getRepairRetrys();
+        RetrySpec spec = retrySpec.get(configVerb);
+        if (!spec.isEnabled())
+            return Backoff.None.INSTANCE;
+        switch (spec.type)
+        {
+            case Exponential:
+                return new Backoff.ExponentialBackoff(spec.maxAttempts.value, 
spec.baseSleepTime.toMilliseconds(), spec.maxSleepTime.toMilliseconds(), 
ctx.random().get()::nextDouble);
+            default:
+                throw new IllegalArgumentException("Unknown type: " + 
spec.type);
+        }
+    }
+
+    @Nullable
+    private static RepairRetrySpec.Verb toConfigVerb(Verb verb)
+    {
+        switch (verb)
+        {
+            case PREPARE_MSG: return RepairRetrySpec.Verb.PREPARE;
+            case VALIDATION_REQ: return RepairRetrySpec.Verb.VALIDATION_REQ;
+            case VALIDATION_RSP: return RepairRetrySpec.Verb.VALIDATION_RSP;
+            case SYNC_REQ: return RepairRetrySpec.Verb.SYNC_REQ;
+            case SYNC_RSP: return RepairRetrySpec.Verb.SYNC_RSP;
+            case SNAPSHOT_MSG: return RepairRetrySpec.Verb.SNAPSHOT;
+            case CLEANUP_MSG: return RepairRetrySpec.Verb.CLEANUP;
+            default: return null;
+        }
+    }
+
+    public interface AllowRetry
+    {
+        boolean test(InetAddressAndPort from, RequestFailureReason 
failureReason, int attempt);
+    }
+
+    public static AllowRetry notDone(Future<?> f)
+    {
+        return (i1, i2, i3) -> !f.isDone();
+    }
+
+    private static AllowRetry always()
+    {
+        return (i1, i2, i3) -> true;
+    }
+
+    public static <T> void sendMessageWithRetries(SharedContext ctx, 
AllowRetry allowRetry, RepairMessage request, Verb verb, InetAddressAndPort 
endpoint, RequestCallback<T> finalCallback)
+    {
+        sendMessageWithRetries(ctx, backoff(ctx, verb), allowRetry, request, 
verb, endpoint, finalCallback, 0);
+    }
+
+    public static <T> void sendMessageWithRetries(SharedContext ctx, 
RepairMessage request, Verb verb, InetAddressAndPort endpoint, 
RequestCallback<T> finalCallback)
+    {
+        sendMessageWithRetries(ctx, backoff(ctx, verb), always(), request, 
verb, endpoint, finalCallback, 0);
+    }
+
+    public static <T> void sendMessageWithRetries(SharedContext ctx, 
RepairMessage request, Verb verb, InetAddressAndPort endpoint)
+    {
+        sendMessageWithRetries(ctx, backoff(ctx, verb), always(), request, 
verb, endpoint, new RequestCallback<>()
         {
             @Override
             public void onResponse(Message<Object> msg)
             {
-                logger.info("[#{}] {} received by {}", 
request.desc.parentSessionId, verb, endpoint);
-                // todo: at some point we should make repair messages follow 
the normal path, actually using this
+            }
+
+            @Override
+            public void onFailure(InetAddressAndPort from, 
RequestFailureReason failureReason)
+            {
+            }
+        }, 0);
+    }
+
+    private static <T> void sendMessageWithRetries(SharedContext ctx, Backoff 
backoff, AllowRetry allowRetry, RepairMessage request, Verb verb, 
InetAddressAndPort endpoint, RequestCallback<T> finalCallback, int attempt)
+    {
+        RequestCallback<T> callback = new RequestCallback<>()
+        {
+            @Override
+            public void onResponse(Message<T> msg)
+            {
+                finalCallback.onResponse(msg);
+            }
+
+            @Override
+            public void onFailure(InetAddressAndPort from, 
RequestFailureReason failureReason)
+            {
+                ErrorHandling allowed = errorHandlingSupported(ctx, endpoint, 
verb, request.parentRepairSession());
+                switch (allowed)
+                {
+                    case NONE:
+                        logger.error("[#{}] {} failed on {}: {}", 
request.parentRepairSession(), verb, from, failureReason);
+                        return;
+                    case TIMEOUT:
+                        finalCallback.onFailure(from, failureReason);
+                        return;
+                    case RETRY:
+                        int maxAttempts = backoff.maxAttempts();
+                        if (failureReason == RequestFailureReason.TIMEOUT && 
attempt < maxAttempts && allowRetry.test(from, failureReason, attempt))
+                        {
+                            ctx.optionalTasks().schedule(() -> 
sendMessageWithRetries(ctx, backoff, allowRetry, request, verb, endpoint, 
finalCallback, attempt + 1),
+                                                         
backoff.computeWaitTime(attempt), backoff.unit());
+                            return;
+                        }
+                        finalCallback.onFailure(from, failureReason);
+                        return;
+                    default:
+                        throw new AssertionError("Unknown error handler: " + 
allowed);
+                }
             }
 
             @Override
             public boolean invokeOnFailure()
             {
                 return true;
             }
+        };
+        ctx.messaging().sendWithCallback(Message.outWithFlag(verb, request, 
CALL_BACK_ON_FAILURE),
+                                         endpoint,
+                                         callback);
+    }
 
+    public static void sendMessageWithFailureCB(SharedContext ctx, AllowRetry 
allowRetry, RepairMessage request, Verb verb, InetAddressAndPort endpoint, 
RepairFailureCallback failureCallback)
+    {
+        RequestCallback<?> callback = new RequestCallback<>()
+        {
+            @Override
+            public void onResponse(Message<Object> msg)
+            {
+                logger.info("[#{}] {} received by {}", 
request.parentRepairSession(), verb, endpoint);
+                // todo: at some point we should make repair messages follow 
the normal path, actually using this
+            }
+
+            @Override
             public void onFailure(InetAddressAndPort from, 
RequestFailureReason failureReason)
             {
-                logger.error("[#{}] {} failed on {}: {}", 
request.desc.parentSessionId, verb, from, failureReason);
+                failureCallback.onFailure(RepairException.error(request.desc, 
PreviewKind.NONE, String.format("Got %s failure from %s: %s", verb, from, 
failureReason)));
+            }
 
-                if (supportsTimeouts(from, request.desc.parentSessionId))
-                    
failureCallback.onFailure(RepairException.error(request.desc, PreviewKind.NONE, 
String.format("Got %s failure from %s: %s", verb, from, failureReason)));
+            @Override
+            public boolean invokeOnFailure()
+            {
+                return true;
             }
         };
-
-        MessagingService.instance().sendWithCallback(Message.outWithFlag(verb, 
request, CALL_BACK_ON_FAILURE),
-                                                     endpoint,
-                                                     callback);
+        sendMessageWithRetries(ctx, allowRetry, request, verb, endpoint, 
callback);
     }
 
-    private static boolean supportsTimeouts(InetAddressAndPort from, TimeUUID 
parentSessionId)
+    private static ErrorHandling errorHandlingSupported(SharedContext ctx, 
InetAddressAndPort from, Verb verb, TimeUUID parentSessionId)
     {
-        CassandraVersion remoteVersion = 
Gossiper.instance.getReleaseVersion(from);
-        if (remoteVersion != null && 
remoteVersion.compareTo(SUPPORTS_TIMEOUTS) >= 0)
-            return true;
-        logger.warn("[#{}] Not failing repair due to remote host {} not 
supporting repair message timeouts (version = {})", parentSessionId, from, 
remoteVersion);
-        return false;
+        if (SUPPORTS_RETRY_WITHOUT_VERSION_CHECK.contains(verb))
+            return ErrorHandling.RETRY;
+        // Repair in mixed mode isn't fully supported, but also not activally 
blocked... so in the common case all participants
+        // will be on the same version as this instance, so can avoid the 
lookup from gossip
+        CassandraVersion remoteVersion = 
ctx.gossiper().getReleaseVersion(from);
+        if (remoteVersion == null)
+        {
+            if (VERB_TIMEOUT_VERSIONS.containsKey(verb))
+            {
+                logger.warn("[#{}] Not failing repair due to remote host {} 
not supporting repair message timeouts (version = {})", parentSessionId, from, 
remoteVersion);

Review Comment:
   `remoteVersion` is always null here



##########
src/java/org/apache/cassandra/service/StorageServiceMBean.java:
##########
@@ -477,7 +478,7 @@ default int upgradeSSTables(String keyspaceName, boolean 
excludeCurrentVersion,
     /**
      * Get the status of a given parent repair session.
      * @param cmd the int reference returned when issuing the repair
-     * @return status of parent repair from enum {@link 
org.apache.cassandra.repair.RepairRunnable.Status}
+     * @return status of parent repair from enum {@link 
RepairCoordinator.Status}

Review Comment:
   this should be `ParentRepairStatus`



##########
src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java:
##########
@@ -172,14 +194,23 @@ public void doVerb(final Message<RepairMessage> message)
                         ColumnFamilyStore store = 
ColumnFamilyStore.getIfExists(desc.keyspace, desc.columnFamily);
                         if (store == null)
                         {
-                            logger.error("Table {}.{} was dropped during 
validation phase of repair {}",
-                                         desc.keyspace, desc.columnFamily, 
desc.parentSessionId);
-                            vState.phase.fail(String.format("Table %s.%s was 
dropped", desc.keyspace, desc.columnFamily));
-                            
MessagingService.instance().send(Message.out(VALIDATION_RSP, new 
ValidationResponse(desc)), message.from());
+                            String msg = String.format("Table %s.% was dropped 
during validation phase of repair %s", desc.keyspace, desc.columnFamily, 
desc.parentSessionId);
+                            vState.phase.fail(msg);
+                            logErrorAndSendFailureResponse(msg, message);
+                            ctx.messaging().send(Message.out(VALIDATION_RSP, 
new ValidationResponse(desc)), message.from());
                             return;
                         }
 
-                        
ActiveRepairService.instance.consistent.local.maybeSetRepairing(desc.parentSessionId);
+                        try
+                        {
+                            
ctx.repair().consistent.local.maybeSetRepairing(desc.parentSessionId);
+                        }
+                        catch (Throwable t)
+                        {
+                            JVMStabilityInspector.inspectThrowable(t);
+                            logErrorAndSendFailureResponse(t.toString(), 
message);
+                            ctx.messaging().send(Message.out(VALIDATION_RSP, 
new ValidationResponse(desc)), message.from());

Review Comment:
   also sending both failure and empty VALIDATION_RSP, if its needed we 
probably need a comment where we do this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to