dcapwell commented on code in PR #3491:
URL: https://github.com/apache/cassandra/pull/3491#discussion_r1733183979


##########
src/java/org/apache/cassandra/net/MessageDelivery.java:
##########
@@ -74,9 +85,162 @@ public void onFailure(InetAddressAndPort from, 
RequestFailureReason reason)
     public <REQ, RSP> void sendWithCallback(Message<REQ> message, 
InetAddressAndPort to, RequestCallback<RSP> cb);
     public <REQ, RSP> void sendWithCallback(Message<REQ> message, 
InetAddressAndPort to, RequestCallback<RSP> cb, ConnectionType 
specifyConnection);
     public <REQ, RSP> Future<Message<RSP>> sendWithResult(Message<REQ> 
message, InetAddressAndPort to);
+
+    public default <REQ, RSP> Future<Message<RSP>> sendWithRetries(Backoff 
backoff,
+                                                                   
RetryScheduler retryThreads,
+                                                                   Verb verb, 
REQ request,
+                                                                   
InetAddressAndPort candidate,
+                                                                   
TriFunction<Integer, InetAddressAndPort, RequestFailureReason, Boolean> 
shouldRetry,
+                                                                   
RetryErrorMessage errorMessage)
+    {
+        return sendWithRetries(new AsyncPromise<>(), (Integer i, Message<RSP> 
msg) -> msg, backoff, retryThreads, verb, request, Iterators.cycle(candidate), 
shouldRetry, errorMessage);
+    }
+
+    public default <REQ, MSG_RSP, RSP> Future<RSP> 
sendWithRetries(BiFunction<Integer, Message<MSG_RSP>, RSP> msgToRsp,
+                                                                   Backoff 
backoff,
+                                                                   
RetryScheduler retryThreads,
+                                                                   Verb verb, 
REQ request,
+                                                                   
InetAddressAndPort candidate,
+                                                                   
TriFunction<Integer, InetAddressAndPort, RequestFailureReason, Boolean> 
shouldRetry,
+                                                                   
RetryErrorMessage errorMessage)
+    {
+        return sendWithRetries(new AsyncPromise<>(), msgToRsp, backoff, 
retryThreads, verb, request, Iterators.cycle(candidate), shouldRetry, 
errorMessage);
+    }
+
+    public default <REQ, MSG_RSP, RSP> Future<RSP> 
sendWithRetries(Promise<RSP> promise,
+                                                                   
BiFunction<Integer, Message<MSG_RSP>, RSP> msgToRsp,
+                                                                   Backoff 
backoff,
+                                                                   
RetryScheduler retryThreads,
+                                                                   Verb verb, 
REQ request,
+                                                                   
Iterator<InetAddressAndPort> candidates,
+                                                                   
TriFunction<Integer, InetAddressAndPort, RequestFailureReason, Boolean> 
shouldRetry,
+                                                                   
RetryErrorMessage errorMessage)
+    {
+        sendWithRetries(this, promise, msgToRsp, backoff, retryThreads, verb, 
request, candidates, shouldRetry, errorMessage, 0);
+        return promise;
+    }
     public <V> void respond(V response, Message<?> message);
     public default void respondWithFailure(RequestFailureReason reason, 
Message<?> message)
     {
         send(Message.failureResponse(message.id(), message.expiresAtNanos(), 
reason), message.respondTo());
     }
+
+    interface RetryErrorMessage
+    {
+        String apply(int attempt, ResponseFailureReason retryFailure, 
@Nullable InetAddressAndPort from, @Nullable RequestFailureReason reason);
+    }
+
+    private static <REQ, MSG_RSP, RSP> void sendWithRetries(MessageDelivery 
messaging,
+                                                            Promise<RSP> 
promise,
+                                                            
BiFunction<Integer, Message<MSG_RSP>, RSP> msgToRsp,
+                                                            Backoff backoff,
+                                                            RetryScheduler 
retryThreads,
+                                                            Verb verb, REQ 
request,
+                                                            
Iterator<InetAddressAndPort> candidates,
+                                                            
TriFunction<Integer, InetAddressAndPort, RequestFailureReason, Boolean> 
shouldRetry,
+                                                            RetryErrorMessage 
errorMessage,
+                                                            int attempt)
+    {
+        if (Thread.currentThread().isInterrupted())
+        {
+            promise.tryFailure(new 
InterruptedException(errorMessage.apply(attempt, 
ResponseFailureReason.Interrupted, null, null)));
+            return;
+        }
+        if (!candidates.hasNext())
+        {
+            promise.tryFailure(new 
NoMoreCandidatesException(errorMessage.apply(attempt, 
ResponseFailureReason.NoMoreCandidates, null, null)));
+            return;
+        }
+        class Request implements RequestCallbackWithFailure<MSG_RSP>
+        {
+            @Override
+            public void onResponse(Message<MSG_RSP> msg)
+            {
+                promise.trySuccess(msgToRsp.apply(attempt, msg));
+            }
+
+            @Override
+            public void onFailure(InetAddressAndPort from, 
RequestFailureReason failure)
+            {
+                if (!backoff.mayRetry(attempt))
+                {
+                    promise.tryFailure(new MaxRetriesException(attempt, 
errorMessage.apply(attempt, ResponseFailureReason.MaxRetries, from, failure)));
+                    return;
+                }
+                if (!shouldRetry.apply(attempt, from, failure))
+                {
+                    promise.tryFailure(new FailedResponseException(from, 
failure, errorMessage.apply(attempt, ResponseFailureReason.Rejected, from, 
failure)));
+                    return;
+                }
+                if (promise.isDone() || promise.isCancelled()) return;
+                try
+                {
+                    retryThreads.schedule(() -> sendWithRetries(messaging, 
promise, msgToRsp, backoff, retryThreads, verb, request, candidates, 
shouldRetry, errorMessage, attempt + 1),
+                                          backoff.computeWaitTime(attempt), 
backoff.unit());
+                }
+                catch (Throwable t)
+                {
+                    promise.tryFailure(new 
FailedScheduleException(errorMessage.apply(attempt, 
ResponseFailureReason.FailedSchedule, from, failure), t));
+                }
+            }
+        }
+        messaging.sendWithCallback(Message.outWithFlag(verb, request, 
CALL_BACK_ON_FAILURE), candidates.next(), new Request());
+    }
+
+    enum ResponseFailureReason { MaxRetries, Rejected, NoMoreCandidates, 
Interrupted, FailedSchedule }
+
+    interface RetryScheduler
+    {
+        void schedule(Runnable command, long delay, TimeUnit unit);
+    }
+
+    enum ImmediateRetryScheduler implements RetryScheduler

Review Comment:
   what do you mean, a name change?  have any in mind?  I went with this name 
as it matches `Executor`
   
   `org.apache.cassandra.concurrent.ImmediateExecutor`
   `io.netty.util.concurrent.ImmediateExecutor`
   
   guava uses the `direct` naming: 
`com.google.common.util.concurrent.MoreExecutors#directExecutor`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to