maedhroz commented on code in PR #2732:
URL: https://github.com/apache/cassandra/pull/2732#discussion_r1337828415


##########
src/java/org/apache/cassandra/repair/messages/RepairMessage.java:
##########
@@ -42,57 +55,182 @@
  */
 public abstract class RepairMessage
 {
-    private static final CassandraVersion SUPPORTS_TIMEOUTS = new 
CassandraVersion("4.0.7-SNAPSHOT");
+    private enum ErrorHandling { NONE, TIMEOUT, RETRY }
+    private static final CassandraVersion SUPPORTS_RETRY = new 
CassandraVersion("5.0.0-alpha2.SNAPSHOT");
+    private static final Map<Verb, CassandraVersion> VERB_TIMEOUT_VERSIONS;
+
+    static
+    {
+        CassandraVersion timeoutVersion = new 
CassandraVersion("4.0.7-SNAPSHOT");
+        EnumMap<Verb, CassandraVersion> map = new EnumMap<>(Verb.class);
+        map.put(Verb.VALIDATION_REQ, timeoutVersion);
+        map.put(Verb.SYNC_REQ, timeoutVersion);
+        map.put(Verb.VALIDATION_RSP, SUPPORTS_RETRY);
+        map.put(Verb.SYNC_RSP, SUPPORTS_RETRY);
+        VERB_TIMEOUT_VERSIONS = Collections.unmodifiableMap(map);
+    }
+    private static final Set<Verb> SUPPORTS_RETRY_WITHOUT_VERSION_CHECK = 
Collections.unmodifiableSet(EnumSet.of(Verb.CLEANUP_MSG));
+
     private static final Logger logger = 
LoggerFactory.getLogger(RepairMessage.class);
+    @Nullable
     public final RepairJobDesc desc;
 
-    protected RepairMessage(RepairJobDesc desc)
+    protected RepairMessage(@Nullable RepairJobDesc desc)
     {
         this.desc = desc;
     }
 
+    public TimeUUID parentRepairSession()
+    {
+        return desc.parentSessionId;
+    }
+
     public interface RepairFailureCallback
     {
         void onFailure(Exception e);
     }
 
-    public static void sendMessageWithFailureCB(RepairMessage request, Verb 
verb, InetAddressAndPort endpoint, RepairFailureCallback failureCallback)
+    private static Backoff backoff(SharedContext ctx, Verb verb)
+    {
+        RepairRetrySpec retrySpec = DatabaseDescriptor.getRepairRetrySpec();
+        RetrySpec spec = verb == Verb.VALIDATION_RSP ? 
retrySpec.getMerkleTreeResponseSpec() : retrySpec;
+        if (!spec.isEnabled())
+            return Backoff.None.INSTANCE;
+        return new Backoff.ExponentialBackoff(spec.maxAttempts.value, 
spec.baseSleepTime.toMilliseconds(), spec.maxSleepTime.toMilliseconds(), 
ctx.random().get()::nextDouble);
+    }
+
+    public static Supplier<Boolean> notDone(Future<?> f)
     {
-        RequestCallback<?> callback = new RequestCallback<Object>()
+        return () -> !f.isDone();
+    }
+
+    private static Supplier<Boolean> always()
+    {
+        return () -> true;
+    }
+
+    public static <T> void sendMessageWithRetries(SharedContext ctx, 
Supplier<Boolean> allowRetry, RepairMessage request, Verb verb, 
InetAddressAndPort endpoint, RequestCallback<T> finalCallback)
+    {
+        sendMessageWithRetries(ctx, backoff(ctx, verb), allowRetry, request, 
verb, endpoint, finalCallback, 0);
+    }
+
+    public static <T> void sendMessageWithRetries(SharedContext ctx, 
RepairMessage request, Verb verb, InetAddressAndPort endpoint, 
RequestCallback<T> finalCallback)
+    {
+        sendMessageWithRetries(ctx, backoff(ctx, verb), always(), request, 
verb, endpoint, finalCallback, 0);
+    }
+
+    public static void sendMessageWithRetries(SharedContext ctx, RepairMessage 
request, Verb verb, InetAddressAndPort endpoint)
+    {
+        sendMessageWithRetries(ctx, backoff(ctx, verb), always(), request, 
verb, endpoint, new RequestCallback<>()
         {
             @Override
             public void onResponse(Message<Object> msg)
             {
-                logger.info("[#{}] {} received by {}", 
request.desc.parentSessionId, verb, endpoint);
-                // todo: at some point we should make repair messages follow 
the normal path, actually using this
+            }
+
+            @Override
+            public void onFailure(InetAddressAndPort from, 
RequestFailureReason failureReason)
+            {
+            }
+        }, 0);
+    }
+
+    private static <T> void sendMessageWithRetries(SharedContext ctx, Backoff 
backoff, Supplier<Boolean> allowRetry, RepairMessage request, Verb verb, 
InetAddressAndPort endpoint, RequestCallback<T> finalCallback, int attempt)
+    {
+        RequestCallback<T> callback = new RequestCallback<>()
+        {
+            @Override
+            public void onResponse(Message<T> msg)
+            {
+                finalCallback.onResponse(msg);
+            }
+
+            @Override
+            public void onFailure(InetAddressAndPort from, 
RequestFailureReason failureReason)
+            {
+                ErrorHandling allowed = errorHandlingSupported(ctx, endpoint, 
verb, request.parentRepairSession());
+                switch (allowed)
+                {
+                    case NONE:
+                        logger.error("[#{}] {} failed on {}: {}", 
request.parentRepairSession(), verb, from, failureReason);
+                        return;
+                    case TIMEOUT:
+                        finalCallback.onFailure(from, failureReason);
+                        return;
+                    case RETRY:
+                        int maxAttempts = backoff.maxAttempts();
+                        if (failureReason == RequestFailureReason.TIMEOUT && 
attempt < maxAttempts && allowRetry.get())
+                        {
+                            ctx.optionalTasks().schedule(() -> 
sendMessageWithRetries(ctx, backoff, allowRetry, request, verb, endpoint, 
finalCallback, attempt + 1),
+                                                         
backoff.computeWaitTime(attempt), backoff.unit());
+                            return;
+                        }
+                        finalCallback.onFailure(from, failureReason);
+                        return;
+                    default:
+                        throw new AssertionError("Unknown error handler: " + 
allowed);
+                }
             }
 
             @Override
             public boolean invokeOnFailure()
             {
                 return true;
             }
+        };
+        ctx.messaging().sendWithCallback(Message.outWithFlag(verb, request, 
CALL_BACK_ON_FAILURE),
+                                         endpoint,
+                                         callback);
+    }
 
+    public static void sendMessageWithFailureCB(SharedContext ctx, 
Supplier<Boolean> allowRetry, RepairMessage request, Verb verb, 
InetAddressAndPort endpoint, RepairFailureCallback failureCallback)
+    {
+        RequestCallback<?> callback = new RequestCallback<>()
+        {
+            @Override
+            public void onResponse(Message<Object> msg)
+            {
+                logger.info("[#{}] {} received by {}", 
request.parentRepairSession(), verb, endpoint);
+                // todo: at some point we should make repair messages follow 
the normal path, actually using this
+            }
+
+            @Override
             public void onFailure(InetAddressAndPort from, 
RequestFailureReason failureReason)
             {
-                logger.error("[#{}] {} failed on {}: {}", 
request.desc.parentSessionId, verb, from, failureReason);
+                failureCallback.onFailure(RepairException.error(request.desc, 
PreviewKind.NONE, String.format("Got %s failure from %s: %s", verb, from, 
failureReason)));
+            }
 
-                if (supportsTimeouts(from, request.desc.parentSessionId))
-                    
failureCallback.onFailure(RepairException.error(request.desc, PreviewKind.NONE, 
String.format("Got %s failure from %s: %s", verb, from, failureReason)));
+            @Override
+            public boolean invokeOnFailure()
+            {
+                return true;
             }
         };
-
-        MessagingService.instance().sendWithCallback(Message.outWithFlag(verb, 
request, CALL_BACK_ON_FAILURE),
-                                                     endpoint,
-                                                     callback);
+        sendMessageWithRetries(ctx, allowRetry, request, verb, endpoint, 
callback);
     }
 
-    private static boolean supportsTimeouts(InetAddressAndPort from, TimeUUID 
parentSessionId)
+    private static ErrorHandling errorHandlingSupported(SharedContext ctx, 
InetAddressAndPort from, Verb verb, TimeUUID parentSessionId)
     {
-        CassandraVersion remoteVersion = 
Gossiper.instance.getReleaseVersion(from);
-        if (remoteVersion != null && 
remoteVersion.compareTo(SUPPORTS_TIMEOUTS) >= 0)
-            return true;
-        logger.warn("[#{}] Not failing repair due to remote host {} not 
supporting repair message timeouts (version = {})", parentSessionId, from, 
remoteVersion);
-        return false;
+        if (SUPPORTS_RETRY_WITHOUT_VERSION_CHECK.contains(verb))
+            return ErrorHandling.RETRY;
+        // Repair in mixed mode isn't fully supported, but also not activally 
blocked... so in the common case all participants
+        // will be on the same version as this instance, so can avoid the 
lookup from gossip
+        CassandraVersion remoteVersion = 
ctx.gossiper().getReleaseVersion(from);
+        if (remoteVersion == null)
+        {
+            if (VERB_TIMEOUT_VERSIONS.containsKey(verb))
+            {
+                logger.warn("[#{}] Not failing repair due to remote host {} 
not supporting repair message timeouts (version is unknown)", parentSessionId, 
from);
+                return ErrorHandling.NONE;
+            }
+            return ErrorHandling.TIMEOUT;
+        }
+        if (remoteVersion.compareTo(SUPPORTS_RETRY) >= 0)
+            return ErrorHandling.RETRY;
+        CassandraVersion timeoutVersion = VERB_TIMEOUT_VERSIONS.get(verb);
+        if (timeoutVersion == null ||
+            remoteVersion.compareTo(timeoutVersion) >= 0)

Review Comment:
   nit: Might be nice for this to be on one line. I have to do a double-take w/ 
the `return` having the same indentation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to