dcapwell commented on code in PR #3491:
URL: https://github.com/apache/cassandra/pull/3491#discussion_r1737152080


##########
src/java/org/apache/cassandra/repair/messages/RepairMessage.java:
##########
@@ -155,98 +153,94 @@ public static Supplier<Boolean> always()
 
     public static <T> void sendMessageWithRetries(SharedContext ctx, 
Supplier<Boolean> allowRetry, RepairMessage request, Verb verb, 
InetAddressAndPort endpoint, RequestCallback<T> finalCallback)
     {
-        sendMessageWithRetries(ctx, backoff(ctx, verb), allowRetry, request, 
verb, endpoint, finalCallback, 0);
+        sendMessageWithRetries(ctx, backoff(ctx, verb), allowRetry, request, 
verb, endpoint, finalCallback);
     }
 
     public static <T> void sendMessageWithRetries(SharedContext ctx, 
RepairMessage request, Verb verb, InetAddressAndPort endpoint, 
RequestCallback<T> finalCallback)
     {
-        sendMessageWithRetries(ctx, backoff(ctx, verb), always(), request, 
verb, endpoint, finalCallback, 0);
+        sendMessageWithRetries(ctx, backoff(ctx, verb), always(), request, 
verb, endpoint, finalCallback);
     }
 
     public static void sendMessageWithRetries(SharedContext ctx, RepairMessage 
request, Verb verb, InetAddressAndPort endpoint)
     {
-        sendMessageWithRetries(ctx, backoff(ctx, verb), always(), request, 
verb, endpoint, NOOP_CALLBACK, 0);
+        sendMessageWithRetries(ctx, backoff(ctx, verb), always(), request, 
verb, endpoint, NOOP_CALLBACK);
     }
 
     public static void sendMessageWithRetries(SharedContext ctx, 
Supplier<Boolean> allowRetry, RepairMessage request, Verb verb, 
InetAddressAndPort endpoint)
     {
-        sendMessageWithRetries(ctx, backoff(ctx, verb), allowRetry, request, 
verb, endpoint, NOOP_CALLBACK, 0);
+        sendMessageWithRetries(ctx, backoff(ctx, verb), allowRetry, request, 
verb, endpoint, NOOP_CALLBACK);
     }
 
     @VisibleForTesting
-    static <T> void sendMessageWithRetries(SharedContext ctx, Backoff backoff, 
Supplier<Boolean> allowRetry, RepairMessage request, Verb verb, 
InetAddressAndPort endpoint, RequestCallback<T> finalCallback, int attempt)
+    static <T> void sendMessageWithRetries(SharedContext ctx, Backoff backoff, 
Supplier<Boolean> allowRetry, RepairMessage request, Verb verb, 
InetAddressAndPort endpoint, RequestCallback<T> finalCallback)
     {
         if (!ALLOWS_RETRY.contains(verb))
             throw new AssertionError("Repair verb " + verb + " does not 
support retry, but a request to send with retry was given!");
-        RequestCallback<T> callback = new RequestCallback<>()
-        {
-            @Override
-            public void onResponse(Message<T> msg)
+        BiConsumer<Integer, RequestFailureReason > maybeRecordRetry = 
(attempt, reason) -> {
+            if (attempt <= 0)
+                return;
+            // we don't know what the prefix kind is... so use NONE... this 
impacts logPrefix as it will cause us to use "repair" rather than "preview 
repair" which may not be correct... but close enough...
+            String prefix = 
PreviewKind.NONE.logPrefix(request.parentRepairSession());
+            RepairMetrics.retry(verb, attempt);
+            if (reason == null)
             {
-                maybeRecordRetry(null);
-                finalCallback.onResponse(msg);
+                noSpam.info("{} Retry of repair verb " + verb + " was 
successful after {} attempts", prefix, attempt);
             }
-
-            @Override
-            public void onFailure(InetAddressAndPort from, 
RequestFailureReason failureReason)
+            else if (reason == RequestFailureReason.TIMEOUT)
             {
-                ErrorHandling allowed = errorHandlingSupported(ctx, endpoint, 
verb, request.parentRepairSession());
-                switch (allowed)
-                {
-                    case NONE:
-                        logger.error("[#{}] {} failed on {}: {}", 
request.parentRepairSession(), verb, from, failureReason);
-                        return;
-                    case TIMEOUT:
-                        finalCallback.onFailure(from, failureReason);
-                        return;
-                    case RETRY:
-                        int maxAttempts = backoff.maxAttempts();
-                        if (failureReason == RequestFailureReason.TIMEOUT && 
attempt < maxAttempts && allowRetry.get())
-                        {
-                            ctx.optionalTasks().schedule(() -> 
sendMessageWithRetries(ctx, backoff, allowRetry, request, verb, endpoint, 
finalCallback, attempt + 1),
-                                                         
backoff.computeWaitTime(attempt), backoff.unit());
-                            return;
-                        }
-                        maybeRecordRetry(failureReason);
-                        finalCallback.onFailure(from, failureReason);
-                        return;
-                    default:
-                        throw new AssertionError("Unknown error handler: " + 
allowed);
-                }
+                noSpam.warn("{} Timeout for repair verb " + verb + "; could 
not complete within {} attempts", prefix, attempt);
+                RepairMetrics.retryTimeout(verb);
             }
-
-            private void maybeRecordRetry(@Nullable RequestFailureReason 
reason)
+            else
             {
-                if (attempt <= 0)
-                    return;
-                // we don't know what the prefix kind is... so use NONE... 
this impacts logPrefix as it will cause us to use "repair" rather than "preview 
repair" which may not be correct... but close enough...
-                String prefix = 
PreviewKind.NONE.logPrefix(request.parentRepairSession());
-                RepairMetrics.retry(verb, attempt);
-                if (reason == null)
-                {
-                    noSpam.info("{} Retry of repair verb " + verb + " was 
successful after {} attempts", prefix, attempt);
-                }
-                else if (reason == RequestFailureReason.TIMEOUT)
-                {
-                    noSpam.warn("{} Timeout for repair verb " + verb + "; 
could not complete within {} attempts", prefix, attempt);
-                    RepairMetrics.retryTimeout(verb);
-                }
-                else
-                {
-                    noSpam.warn("{} {} failure for repair verb " + verb + "; 
could not complete within {} attempts", prefix, reason, attempt);
-                    RepairMetrics.retryFailure(verb);
-                }
-            }
-
-            @Override
-            public boolean invokeOnFailure()
-            {
-                return true;
+                noSpam.warn("{} {} failure for repair verb " + verb + "; could 
not complete within {} attempts", prefix, reason, attempt);
+                RepairMetrics.retryFailure(verb);
             }
         };
-        ctx.messaging().sendWithCallback(Message.outWithFlag(verb, request, 
CALL_BACK_ON_FAILURE),
-                                         endpoint,
-                                         callback);
+        ctx.messaging().sendWithRetries(backoff, ctx.optionalTasks()::schedule,
+                                        verb, request, 
Iterators.cycle(endpoint),
+                                        (int attempt, Message<T> msg, 
Throwable failure) -> {
+                                            if (failure == null)
+                                            {
+                                                
maybeRecordRetry.accept(attempt, null);
+                                                finalCallback.onResponse(msg);
+                                            }
+                                        },
+                                        (attempt, from, failure) -> {
+                                            ErrorHandling allowed = 
errorHandlingSupported(ctx, endpoint, verb, request.parentRepairSession());
+                                            switch (allowed)
+                                            {
+                                                case NONE:
+                                                    logger.error("[#{}] {} 
failed on {}: {}", request.parentRepairSession(), verb, from, failure);
+                                                    return false;
+                                                case TIMEOUT:
+                                                    
finalCallback.onFailure(from, failure);
+                                                    return false;
+                                                case RETRY:
+                                                    if (failure == 
RequestFailureReason.TIMEOUT && allowRetry.get())
+                                                        return true;
+                                                    
maybeRecordRetry.accept(attempt, failure);
+                                                    
finalCallback.onFailure(from, failure);
+                                                    return false;
+                                                default:
+                                                    throw new 
AssertionError("Unknown error handler: " + allowed);
+                                            }
+                                        },
+                                        (attempt, retryReason, from, failure) 
-> {
+                                            switch (retryReason)
+                                            {
+                                                case MaxRetries:
+                                                    
maybeRecordRetry.accept(attempt, failure);

Review Comment:
   >  but this lambda seems to be responsible for returning an error string 
rather than controlling a callback
   
   It's logging + metrics.  Logging is targeted at operators and is different 
based off these inputs, so are the metrics... so returning a `String` would be 
hard as I then need to pattern match on that string.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to