sashapolo commented on code in PR #4543:
URL: https://github.com/apache/ignite-3/pull/4543#discussion_r1799785244
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java:
##########
@@ -556,167 +557,172 @@ private <R extends NetworkMessage> CompletableFuture<R>
sendWithRetry(
) {
var future = new CompletableFuture<R>();
- sendWithRetry(peer, requestFactory, currentTimeMillis() +
configuration.retryTimeout().value(), future, 1);
+ var context = new RetryContext(peer, requestFactory,
currentTimeMillis() + configuration.retryTimeout().value(), 0);
+
+ sendWithRetry(future, context);
return future;
}
/**
* Retries a request until success or timeout.
*
- * @param peer Initial target peer, request can be sent to a random peer
if the target peer is unavailable.
- * @param requestFactory Factory for creating requests to the target peer.
- * @param stopTime Stop time.
- * @param fut The future.
+ * @param fut Result future.
+ * @param retryContext Context.
* @param <R> Response type.
- * @param retryCount Number of retries made. sendWithRetry method has a
recursion nature, in case of recoverable exceptions or peer
- * unavailability it'll be scheduled for a next attempt. Generally a
request will be retried until success or timeout.
*/
- private <R extends NetworkMessage> void sendWithRetry(
- Peer peer,
- Function<Peer, ? extends NetworkMessage> requestFactory,
- long stopTime,
- CompletableFuture<R> fut,
- int retryCount
-
- ) {
+ private <R extends NetworkMessage> void sendWithRetry(CompletableFuture<R>
fut, RetryContext retryContext) {
if (!busyLock.enterBusy()) {
fut.cancel(true);
return;
}
try {
- if (currentTimeMillis() >= stopTime) {
- fut.completeExceptionally(
- new TimeoutException(format("Send with retry timed out
[retryCount = {}, groupId = {}].", retryCount, groupId)));
+ if (currentTimeMillis() >= retryContext.stopTime()) {
+ fut.completeExceptionally(new TimeoutException(format(
+ "Send with retry timed out [retryCount = {}, groupId =
{}].",
+ retryContext.retryCount(),
+ groupId
+ )));
return;
}
- NetworkMessage request = requestFactory.apply(peer);
+ NetworkMessage request = retryContext.request();
- resolvePeer(peer)
+ resolvePeer(retryContext.targetPeer())
.thenCompose(node ->
cluster.messagingService().invoke(node, request,
configuration.responseTimeout().value()))
.whenComplete((resp, err) -> {
if (LOG.isTraceEnabled()) {
LOG.trace("sendWithRetry req={} resp={} from={}
to={} err={}",
request,
resp,
cluster.topologyService().localMember().address(),
- peer.consistentId(),
+ retryContext.targetPeer().consistentId(),
err == null ? null : err.getMessage());
}
- if (err != null) {
- handleThrowable(err, peer, request,
requestFactory, stopTime, fut, retryCount);
- } else if (resp instanceof ErrorResponse) {
- handleErrorResponse((ErrorResponse) resp, peer,
request, requestFactory, stopTime, fut, retryCount);
- } else if (resp instanceof SMErrorResponse) {
- handleSmErrorResponse((SMErrorResponse) resp, fut);
- } else {
- leader = peer; // The OK response was received
from a leader.
-
- fut.complete((R) resp);
+ try {
+ if (err != null) {
+ handleThrowable(fut, err, retryContext);
+ } else if (resp instanceof ErrorResponse) {
+ handleErrorResponse(fut, (ErrorResponse) resp,
retryContext);
+ } else if (resp instanceof SMErrorResponse) {
+ handleSmErrorResponse(fut, (SMErrorResponse)
resp);
+ } else {
+ leader = retryContext.targetPeer(); // The OK
response was received from a leader.
+
+ fut.complete((R) resp);
+ }
+ } catch (Throwable e) {
+ fut.completeExceptionally(e);
Review Comment:
> Are we going to retry the request?
No, we are going to fail with an error, because there are no live peers left
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]