ifesdjeen commented on code in PR #3491:
URL: https://github.com/apache/cassandra/pull/3491#discussion_r1732673364


##########
src/java/org/apache/cassandra/net/MessageDelivery.java:
##########
@@ -74,9 +85,162 @@ public void onFailure(InetAddressAndPort from, 
RequestFailureReason reason)
     public <REQ, RSP> void sendWithCallback(Message<REQ> message, 
InetAddressAndPort to, RequestCallback<RSP> cb);
     public <REQ, RSP> void sendWithCallback(Message<REQ> message, 
InetAddressAndPort to, RequestCallback<RSP> cb, ConnectionType 
specifyConnection);
     public <REQ, RSP> Future<Message<RSP>> sendWithResult(Message<REQ> 
message, InetAddressAndPort to);
+
+    public default <REQ, RSP> Future<Message<RSP>> sendWithRetries(Backoff 
backoff,
+                                                                   
RetryScheduler retryThreads,
+                                                                   Verb verb, 
REQ request,
+                                                                   
InetAddressAndPort candidate,
+                                                                   
TriFunction<Integer, InetAddressAndPort, RequestFailureReason, Boolean> 
shouldRetry,
+                                                                   
RetryErrorMessage errorMessage)
+    {
+        return sendWithRetries(new AsyncPromise<>(), (Integer i, Message<RSP> 
msg) -> msg, backoff, retryThreads, verb, request, Iterators.cycle(candidate), 
shouldRetry, errorMessage);
+    }
+
+    public default <REQ, MSG_RSP, RSP> Future<RSP> 
sendWithRetries(BiFunction<Integer, Message<MSG_RSP>, RSP> msgToRsp,
+                                                                   Backoff 
backoff,
+                                                                   
RetryScheduler retryThreads,
+                                                                   Verb verb, 
REQ request,
+                                                                   
InetAddressAndPort candidate,
+                                                                   
TriFunction<Integer, InetAddressAndPort, RequestFailureReason, Boolean> 
shouldRetry,
+                                                                   
RetryErrorMessage errorMessage)
+    {
+        return sendWithRetries(new AsyncPromise<>(), msgToRsp, backoff, 
retryThreads, verb, request, Iterators.cycle(candidate), shouldRetry, 
errorMessage);
+    }
+
+    public default <REQ, MSG_RSP, RSP> Future<RSP> 
sendWithRetries(Promise<RSP> promise,
+                                                                   
BiFunction<Integer, Message<MSG_RSP>, RSP> msgToRsp,
+                                                                   Backoff 
backoff,
+                                                                   
RetryScheduler retryThreads,
+                                                                   Verb verb, 
REQ request,
+                                                                   
Iterator<InetAddressAndPort> candidates,
+                                                                   
TriFunction<Integer, InetAddressAndPort, RequestFailureReason, Boolean> 
shouldRetry,
+                                                                   
RetryErrorMessage errorMessage)
+    {
+        sendWithRetries(this, promise, msgToRsp, backoff, retryThreads, verb, 
request, candidates, shouldRetry, errorMessage, 0);
+        return promise;
+    }
     public <V> void respond(V response, Message<?> message);
     public default void respondWithFailure(RequestFailureReason reason, 
Message<?> message)
     {
         send(Message.failureResponse(message.id(), message.expiresAtNanos(), 
reason), message.respondTo());
     }
+
+    interface RetryErrorMessage
+    {
+        String apply(int attempt, ResponseFailureReason retryFailure, 
@Nullable InetAddressAndPort from, @Nullable RequestFailureReason reason);
+    }
+
+    private static <REQ, MSG_RSP, RSP> void sendWithRetries(MessageDelivery 
messaging,

Review Comment:
   What if we tried something like this:
   
   ```
       interface OnResult<T>
       {
           public void result(int attempt, Message<T> success, Throwable 
failure);
       }
   
       interface RetryErrorMessage
       {
           String apply(int attempt, ResponseFailureReason retryFailure, 
@Nullable InetAddressAndPort from, @Nullable RequestFailureReason reason);
       }
   
       private static <REQ, MSG_RSP> void sendWithRetries(MessageDelivery 
messaging,
                                                          OnResult<REQ> 
onResult,
                                                          Backoff backoff,
                                                          RetryScheduler 
retryThreads,
                                                          Verb verb, REQ 
request,
                                                          
Iterator<InetAddressAndPort> candidates,
                                                          TriFunction<Integer, 
InetAddressAndPort, RequestFailureReason, Boolean> shouldRetry,
                                                          RetryErrorMessage 
errorMessage,
                                                          int attempt)
   
       {
           Promise<REQ> promise = new AsyncPromise<>();
           sendWithRetries(messaging, 
                           (att, res, throwable) -> {
                               if (res != null)
                                   promise.setSuccess(res.payload);
                               else 
                                   promise.setFailure(throwable);
                           },
                           backoff,
                           retryThreads,
                           verb, 
                           request,
                           candidates, 
                           shouldRetry, 
                           errorMessage,
                           0);
           try
           {
               promise.await().get();
           }
           catch (Throwable e)
           {
               throw new RuntimeException(e);
           }
       }
       private static <REQ, RSP> void sendWithRetriesInternal(MessageDelivery 
messaging,
                                                              OnResult<RSP> 
onResult,
                                                              Backoff backoff,
                                                              RetryScheduler 
retryThreads,
                                                              Verb verb, REQ 
request,
                                                              
Iterator<InetAddressAndPort> candidates,
                                                              
TriFunction<Integer, InetAddressAndPort, RequestFailureReason, Boolean> 
shouldRetry,
                                                              RetryErrorMessage 
errorMessage,
                                                              int attempt)
       {
           class Request implements RequestCallbackWithFailure<RSP>
           {
               @Override
               public void onResponse(Message<RSP> msg)
               {
                   onResult.result(attempt, msg, null);
               }
   
               @Override
               public void onFailure(InetAddressAndPort from, 
RequestFailureReason failure)
               {
                   if (Thread.currentThread().isInterrupted())
                   {
                       onResult.result(attempt, null, new 
InterruptedException(errorMessage.apply(attempt, 
ResponseFailureReason.Interrupted, null, null)));
                       return;
                   }
                   if (!candidates.hasNext())
                   {
                       onResult.result(attempt, null, new 
NoMoreCandidatesException(errorMessage.apply(attempt, 
ResponseFailureReason.NoMoreCandidates, null, null)));
                       return;
                   }
                   if (!backoff.mayRetry(attempt))
                   {
                       onResult.result(attempt, null, new 
MaxRetriesException(attempt, errorMessage.apply(attempt, 
ResponseFailureReason.MaxRetries, from, failure)));
                       return;
                   }
                   if (!shouldRetry.apply(attempt, from, failure))
                   {
                       onResult.result(attempt, null, new 
FailedResponseException(from, failure, errorMessage.apply(attempt, 
ResponseFailureReason.Rejected, from, failure)));
                       return;
                   }
                   try
                   {
                       retryThreads.schedule(() -> sendWithRetries(messaging, 
onResult, backoff, retryThreads, verb, request, candidates, shouldRetry, 
errorMessage, attempt + 1),
                                             backoff.computeWaitTime(attempt), 
backoff.unit());
                   }
                   catch (Throwable t)
                   {
                       onResult.result(attempt, null, new 
FailedScheduleException(errorMessage.apply(attempt, 
ResponseFailureReason.FailedSchedule, from, failure), t));
                   }
               }
           }
           messaging.sendWithCallback(Message.outWithFlag(verb, request, 
CALL_BACK_ON_FAILURE), candidates.next(), new Request());
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to