dcapwell commented on code in PR #3491:
URL: https://github.com/apache/cassandra/pull/3491#discussion_r1733252423


##########
src/java/org/apache/cassandra/net/MessageDelivery.java:
##########
@@ -74,9 +85,162 @@ public void onFailure(InetAddressAndPort from, 
RequestFailureReason reason)
     public <REQ, RSP> void sendWithCallback(Message<REQ> message, 
InetAddressAndPort to, RequestCallback<RSP> cb);
     public <REQ, RSP> void sendWithCallback(Message<REQ> message, 
InetAddressAndPort to, RequestCallback<RSP> cb, ConnectionType 
specifyConnection);
     public <REQ, RSP> Future<Message<RSP>> sendWithResult(Message<REQ> 
message, InetAddressAndPort to);
+
+    public default <REQ, RSP> Future<Message<RSP>> sendWithRetries(Backoff 
backoff,
+                                                                   
RetryScheduler retryThreads,
+                                                                   Verb verb, 
REQ request,
+                                                                   
InetAddressAndPort candidate,
+                                                                   
TriFunction<Integer, InetAddressAndPort, RequestFailureReason, Boolean> 
shouldRetry,
+                                                                   
RetryErrorMessage errorMessage)
+    {
+        return sendWithRetries(new AsyncPromise<>(), (Integer i, Message<RSP> 
msg) -> msg, backoff, retryThreads, verb, request, Iterators.cycle(candidate), 
shouldRetry, errorMessage);
+    }
+
+    public default <REQ, MSG_RSP, RSP> Future<RSP> 
sendWithRetries(BiFunction<Integer, Message<MSG_RSP>, RSP> msgToRsp,
+                                                                   Backoff 
backoff,
+                                                                   
RetryScheduler retryThreads,
+                                                                   Verb verb, 
REQ request,
+                                                                   
InetAddressAndPort candidate,
+                                                                   
TriFunction<Integer, InetAddressAndPort, RequestFailureReason, Boolean> 
shouldRetry,
+                                                                   
RetryErrorMessage errorMessage)
+    {
+        return sendWithRetries(new AsyncPromise<>(), msgToRsp, backoff, 
retryThreads, verb, request, Iterators.cycle(candidate), shouldRetry, 
errorMessage);
+    }
+
+    public default <REQ, MSG_RSP, RSP> Future<RSP> 
sendWithRetries(Promise<RSP> promise,
+                                                                   
BiFunction<Integer, Message<MSG_RSP>, RSP> msgToRsp,
+                                                                   Backoff 
backoff,
+                                                                   
RetryScheduler retryThreads,
+                                                                   Verb verb, 
REQ request,
+                                                                   
Iterator<InetAddressAndPort> candidates,
+                                                                   
TriFunction<Integer, InetAddressAndPort, RequestFailureReason, Boolean> 
shouldRetry,
+                                                                   
RetryErrorMessage errorMessage)
+    {
+        sendWithRetries(this, promise, msgToRsp, backoff, retryThreads, verb, 
request, candidates, shouldRetry, errorMessage, 0);
+        return promise;
+    }
     public <V> void respond(V response, Message<?> message);
     public default void respondWithFailure(RequestFailureReason reason, 
Message<?> message)
     {
         send(Message.failureResponse(message.id(), message.expiresAtNanos(), 
reason), message.respondTo());
     }
+
+    interface RetryErrorMessage
+    {
+        String apply(int attempt, ResponseFailureReason retryFailure, 
@Nullable InetAddressAndPort from, @Nullable RequestFailureReason reason);
+    }
+
+    private static <REQ, MSG_RSP, RSP> void sendWithRetries(MessageDelivery 
messaging,

Review Comment:
   for the TCM case can always add `if (promise.isDone() || 
promise.isCancelled()) return false;` into the retry function...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to