dcapwell commented on code in PR #3491:
URL: https://github.com/apache/cassandra/pull/3491#discussion_r1733282841


##########
src/java/org/apache/cassandra/repair/messages/RepairMessage.java:
##########
@@ -155,98 +153,94 @@ public static Supplier<Boolean> always()
 
     public static <T> void sendMessageWithRetries(SharedContext ctx, 
Supplier<Boolean> allowRetry, RepairMessage request, Verb verb, 
InetAddressAndPort endpoint, RequestCallback<T> finalCallback)
     {
-        sendMessageWithRetries(ctx, backoff(ctx, verb), allowRetry, request, 
verb, endpoint, finalCallback, 0);
+        sendMessageWithRetries(ctx, backoff(ctx, verb), allowRetry, request, 
verb, endpoint, finalCallback);
     }
 
     public static <T> void sendMessageWithRetries(SharedContext ctx, 
RepairMessage request, Verb verb, InetAddressAndPort endpoint, 
RequestCallback<T> finalCallback)
     {
-        sendMessageWithRetries(ctx, backoff(ctx, verb), always(), request, 
verb, endpoint, finalCallback, 0);
+        sendMessageWithRetries(ctx, backoff(ctx, verb), always(), request, 
verb, endpoint, finalCallback);
     }
 
     public static void sendMessageWithRetries(SharedContext ctx, RepairMessage 
request, Verb verb, InetAddressAndPort endpoint)
     {
-        sendMessageWithRetries(ctx, backoff(ctx, verb), always(), request, 
verb, endpoint, NOOP_CALLBACK, 0);
+        sendMessageWithRetries(ctx, backoff(ctx, verb), always(), request, 
verb, endpoint, NOOP_CALLBACK);
     }
 
     public static void sendMessageWithRetries(SharedContext ctx, 
Supplier<Boolean> allowRetry, RepairMessage request, Verb verb, 
InetAddressAndPort endpoint)
     {
-        sendMessageWithRetries(ctx, backoff(ctx, verb), allowRetry, request, 
verb, endpoint, NOOP_CALLBACK, 0);
+        sendMessageWithRetries(ctx, backoff(ctx, verb), allowRetry, request, 
verb, endpoint, NOOP_CALLBACK);
     }
 
     @VisibleForTesting
-    static <T> void sendMessageWithRetries(SharedContext ctx, Backoff backoff, 
Supplier<Boolean> allowRetry, RepairMessage request, Verb verb, 
InetAddressAndPort endpoint, RequestCallback<T> finalCallback, int attempt)
+    static <T> void sendMessageWithRetries(SharedContext ctx, Backoff backoff, 
Supplier<Boolean> allowRetry, RepairMessage request, Verb verb, 
InetAddressAndPort endpoint, RequestCallback<T> finalCallback)
     {
         if (!ALLOWS_RETRY.contains(verb))
             throw new AssertionError("Repair verb " + verb + " does not 
support retry, but a request to send with retry was given!");
-        RequestCallback<T> callback = new RequestCallback<>()
-        {
-            @Override
-            public void onResponse(Message<T> msg)
+        BiConsumer<Integer, RequestFailureReason > maybeRecordRetry = 
(attempt, reason) -> {
+            if (attempt <= 0)
+                return;
+            // we don't know what the prefix kind is... so use NONE... this 
impacts logPrefix as it will cause us to use "repair" rather than "preview 
repair" which may not be correct... but close enough...
+            String prefix = 
PreviewKind.NONE.logPrefix(request.parentRepairSession());
+            RepairMetrics.retry(verb, attempt);
+            if (reason == null)
             {
-                maybeRecordRetry(null);
-                finalCallback.onResponse(msg);
+                noSpam.info("{} Retry of repair verb " + verb + " was 
successful after {} attempts", prefix, attempt);
             }
-
-            @Override
-            public void onFailure(InetAddressAndPort from, 
RequestFailureReason failureReason)
+            else if (reason == RequestFailureReason.TIMEOUT)
             {
-                ErrorHandling allowed = errorHandlingSupported(ctx, endpoint, 
verb, request.parentRepairSession());
-                switch (allowed)
-                {
-                    case NONE:
-                        logger.error("[#{}] {} failed on {}: {}", 
request.parentRepairSession(), verb, from, failureReason);
-                        return;
-                    case TIMEOUT:
-                        finalCallback.onFailure(from, failureReason);
-                        return;
-                    case RETRY:
-                        int maxAttempts = backoff.maxAttempts();
-                        if (failureReason == RequestFailureReason.TIMEOUT && 
attempt < maxAttempts && allowRetry.get())
-                        {
-                            ctx.optionalTasks().schedule(() -> 
sendMessageWithRetries(ctx, backoff, allowRetry, request, verb, endpoint, 
finalCallback, attempt + 1),
-                                                         
backoff.computeWaitTime(attempt), backoff.unit());
-                            return;
-                        }
-                        maybeRecordRetry(failureReason);
-                        finalCallback.onFailure(from, failureReason);
-                        return;
-                    default:
-                        throw new AssertionError("Unknown error handler: " + 
allowed);
-                }
+                noSpam.warn("{} Timeout for repair verb " + verb + "; could 
not complete within {} attempts", prefix, attempt);
+                RepairMetrics.retryTimeout(verb);
             }
-
-            private void maybeRecordRetry(@Nullable RequestFailureReason 
reason)
+            else
             {
-                if (attempt <= 0)
-                    return;
-                // we don't know what the prefix kind is... so use NONE... 
this impacts logPrefix as it will cause us to use "repair" rather than "preview 
repair" which may not be correct... but close enough...
-                String prefix = 
PreviewKind.NONE.logPrefix(request.parentRepairSession());
-                RepairMetrics.retry(verb, attempt);
-                if (reason == null)
-                {
-                    noSpam.info("{} Retry of repair verb " + verb + " was 
successful after {} attempts", prefix, attempt);
-                }
-                else if (reason == RequestFailureReason.TIMEOUT)
-                {
-                    noSpam.warn("{} Timeout for repair verb " + verb + "; 
could not complete within {} attempts", prefix, attempt);
-                    RepairMetrics.retryTimeout(verb);
-                }
-                else
-                {
-                    noSpam.warn("{} {} failure for repair verb " + verb + "; 
could not complete within {} attempts", prefix, reason, attempt);
-                    RepairMetrics.retryFailure(verb);
-                }
-            }
-
-            @Override
-            public boolean invokeOnFailure()
-            {
-                return true;
+                noSpam.warn("{} {} failure for repair verb " + verb + "; could 
not complete within {} attempts", prefix, reason, attempt);
+                RepairMetrics.retryFailure(verb);
             }
         };
-        ctx.messaging().sendWithCallback(Message.outWithFlag(verb, request, 
CALL_BACK_ON_FAILURE),
-                                         endpoint,
-                                         callback);
+        ctx.messaging().sendWithRetries(backoff, ctx.optionalTasks()::schedule,
+                                        verb, request, 
Iterators.cycle(endpoint),
+                                        (int attempt, Message<T> msg, 
Throwable failure) -> {
+                                            if (failure == null)

Review Comment:
   failure case is handled in the other callbacks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to