dcapwell commented on code in PR #3491:
URL: https://github.com/apache/cassandra/pull/3491#discussion_r1728089186


##########
src/java/org/apache/cassandra/repair/messages/RepairMessage.java:
##########
@@ -155,98 +152,92 @@ public static Supplier<Boolean> always()
 
     public static <T> void sendMessageWithRetries(SharedContext ctx, 
Supplier<Boolean> allowRetry, RepairMessage request, Verb verb, 
InetAddressAndPort endpoint, RequestCallback<T> finalCallback)
     {
-        sendMessageWithRetries(ctx, backoff(ctx, verb), allowRetry, request, 
verb, endpoint, finalCallback, 0);
+        sendMessageWithRetries(ctx, backoff(ctx, verb), allowRetry, request, 
verb, endpoint, finalCallback);
     }
 
     public static <T> void sendMessageWithRetries(SharedContext ctx, 
RepairMessage request, Verb verb, InetAddressAndPort endpoint, 
RequestCallback<T> finalCallback)
     {
-        sendMessageWithRetries(ctx, backoff(ctx, verb), always(), request, 
verb, endpoint, finalCallback, 0);
+        sendMessageWithRetries(ctx, backoff(ctx, verb), always(), request, 
verb, endpoint, finalCallback);
     }
 
     public static void sendMessageWithRetries(SharedContext ctx, RepairMessage 
request, Verb verb, InetAddressAndPort endpoint)
     {
-        sendMessageWithRetries(ctx, backoff(ctx, verb), always(), request, 
verb, endpoint, NOOP_CALLBACK, 0);
+        sendMessageWithRetries(ctx, backoff(ctx, verb), always(), request, 
verb, endpoint, NOOP_CALLBACK);
     }
 
     public static void sendMessageWithRetries(SharedContext ctx, 
Supplier<Boolean> allowRetry, RepairMessage request, Verb verb, 
InetAddressAndPort endpoint)
     {
-        sendMessageWithRetries(ctx, backoff(ctx, verb), allowRetry, request, 
verb, endpoint, NOOP_CALLBACK, 0);
+        sendMessageWithRetries(ctx, backoff(ctx, verb), allowRetry, request, 
verb, endpoint, NOOP_CALLBACK);
     }
 
     @VisibleForTesting
-    static <T> void sendMessageWithRetries(SharedContext ctx, Backoff backoff, 
Supplier<Boolean> allowRetry, RepairMessage request, Verb verb, 
InetAddressAndPort endpoint, RequestCallback<T> finalCallback, int attempt)
+    static <T> void sendMessageWithRetries(SharedContext ctx, Backoff backoff, 
Supplier<Boolean> allowRetry, RepairMessage request, Verb verb, 
InetAddressAndPort endpoint, RequestCallback<T> finalCallback)
     {
         if (!ALLOWS_RETRY.contains(verb))
             throw new AssertionError("Repair verb " + verb + " does not 
support retry, but a request to send with retry was given!");
-        RequestCallback<T> callback = new RequestCallback<>()
-        {
-            @Override
-            public void onResponse(Message<T> msg)
+        BiConsumer<Integer, RequestFailureReason > maybeRecordRetry = 
(attempt, reason) -> {
+            if (attempt <= 0)
+                return;
+            // we don't know what the prefix kind is... so use NONE... this 
impacts logPrefix as it will cause us to use "repair" rather than "preview 
repair" which may not be correct... but close enough...
+            String prefix = 
PreviewKind.NONE.logPrefix(request.parentRepairSession());
+            RepairMetrics.retry(verb, attempt);
+            if (reason == null)
             {
-                maybeRecordRetry(null);
-                finalCallback.onResponse(msg);
+                noSpam.info("{} Retry of repair verb " + verb + " was 
successful after {} attempts", prefix, attempt);
             }
-
-            @Override
-            public void onFailure(InetAddressAndPort from, 
RequestFailureReason failureReason)
+            else if (reason == RequestFailureReason.TIMEOUT)
             {
-                ErrorHandling allowed = errorHandlingSupported(ctx, endpoint, 
verb, request.parentRepairSession());
-                switch (allowed)
-                {
-                    case NONE:
-                        logger.error("[#{}] {} failed on {}: {}", 
request.parentRepairSession(), verb, from, failureReason);
-                        return;
-                    case TIMEOUT:
-                        finalCallback.onFailure(from, failureReason);
-                        return;
-                    case RETRY:
-                        int maxAttempts = backoff.maxAttempts();
-                        if (failureReason == RequestFailureReason.TIMEOUT && 
attempt < maxAttempts && allowRetry.get())
-                        {
-                            ctx.optionalTasks().schedule(() -> 
sendMessageWithRetries(ctx, backoff, allowRetry, request, verb, endpoint, 
finalCallback, attempt + 1),
-                                                         
backoff.computeWaitTime(attempt), backoff.unit());
-                            return;
-                        }
-                        maybeRecordRetry(failureReason);
-                        finalCallback.onFailure(from, failureReason);
-                        return;
-                    default:
-                        throw new AssertionError("Unknown error handler: " + 
allowed);
-                }
+                noSpam.warn("{} Timeout for repair verb " + verb + "; could 
not complete within {} attempts", prefix, attempt);
+                RepairMetrics.retryTimeout(verb);
             }
-
-            private void maybeRecordRetry(@Nullable RequestFailureReason 
reason)
+            else
             {
-                if (attempt <= 0)
-                    return;
-                // we don't know what the prefix kind is... so use NONE... 
this impacts logPrefix as it will cause us to use "repair" rather than "preview 
repair" which may not be correct... but close enough...
-                String prefix = 
PreviewKind.NONE.logPrefix(request.parentRepairSession());
-                RepairMetrics.retry(verb, attempt);
-                if (reason == null)
-                {
-                    noSpam.info("{} Retry of repair verb " + verb + " was 
successful after {} attempts", prefix, attempt);
-                }
-                else if (reason == RequestFailureReason.TIMEOUT)
-                {
-                    noSpam.warn("{} Timeout for repair verb " + verb + "; 
could not complete within {} attempts", prefix, attempt);
-                    RepairMetrics.retryTimeout(verb);
-                }
-                else
-                {
-                    noSpam.warn("{} {} failure for repair verb " + verb + "; 
could not complete within {} attempts", prefix, reason, attempt);
-                    RepairMetrics.retryFailure(verb);
-                }
-            }
-
-            @Override
-            public boolean invokeOnFailure()
-            {
-                return true;
+                noSpam.warn("{} {} failure for repair verb " + verb + "; could 
not complete within {} attempts", prefix, reason, attempt);
+                RepairMetrics.retryFailure(verb);
             }
         };
-        ctx.messaging().sendWithCallback(Message.outWithFlag(verb, request, 
CALL_BACK_ON_FAILURE),
-                                         endpoint,
-                                         callback);
+        ctx.messaging().sendWithRetries((Integer attempt, Message<T> msg) -> {

Review Comment:
   I wanted to centralize this as much as possible, but with this change I do 
think I can avoid the allocation of the messaging promise... most callers do 
`SnapshotTask extends AsyncFuture<InetAddressAndPort>` and just plumb the 
message payload into itself... log ordering has been a problem with repair 
(external tools rely on it) so figured safest just to replace the retry logic 
as this logic is very well tested and the optimize to have 1 less future didn't 
seem worth the risk



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to