dcapwell commented on code in PR #2660:
URL: https://github.com/apache/cassandra/pull/2660#discussion_r1327797434
##########
src/java/org/apache/cassandra/service/ActiveRepairService.java:
##########
@@ -678,22 +670,60 @@ public boolean invokeOnFailure()
}
}
}
- try
+ // implement timeout to bound the runtime of the future
+ long timeoutMillis =
getRepairRetrys().isEnabled(RepairRetrySpec.Verb.PREPARE) ?
getRepairRpcTimeout(MILLISECONDS)
+
: getRpcTimeout(MILLISECONDS);
+ ctx.optionalTasks().schedule(() -> {
+ if (promise.isDone())
+ return;
+ String errorMsg = "Did not get replies from all endpoints.";
+ if (promise.tryFailure(new RuntimeException(errorMsg)))
+ participateFailed(parentRepairSession, errorMsg);
+ }, timeoutMillis, MILLISECONDS);
+
+ return promise;
+ }
+
+ private void sendPrepareWithRetries(TimeUUID parentRepairSession,
+ AtomicInteger pending,
+ Set<String> failedNodes,
+ AsyncPromise<Void> promise,
+ InetAddressAndPort to,
+ RepairMessage msg)
+ {
+ RepairMessage.sendMessageWithRetries(ctx, notDone(promise), msg,
PREPARE_MSG, to, new RequestCallback<>()
{
- if (!prepareLatch.await(getRpcTimeout(MILLISECONDS), MILLISECONDS)
|| timeouts.get() > 0)
- failRepair(parentRepairSession, "Did not get replies from all
endpoints.");
- }
- catch (InterruptedException e)
- {
- failRepair(parentRepairSession, "Interrupted while waiting for
prepare repair response.");
- }
+ @Override
+ public void onResponse(Message<Object> msg)
+ {
+ ack();
+ }
- if (!status.get())
- {
- failRepair(parentRepairSession, "Got negative replies from
endpoints " + failedNodes);
- }
+ @Override
+ public void onFailure(InetAddressAndPort from,
RequestFailureReason failureReason)
+ {
+ failedNodes.add(from.toString());
+ if (failureReason == RequestFailureReason.TIMEOUT)
+ {
+ pending.set(-1);
+
promise.setFailure(failRepairException(parentRepairSession, "Did not get
replies from all endpoints."));
+ }
+ else
+ {
+ ack();
+ }
+ }
+
+ private void ack()
+ {
+ if (pending.decrementAndGet() == 0)
+ {
+ if (failedNodes.isEmpty()) promise.setSuccess(null);
Review Comment:
hmmm, thought this was the style... this is something we do in accord for C*
style reasons...
just ub case u switched to the safe
```
if (failedNodes.isEmpty())
{
promise.setSuccess(null);
}
else
{
promise.setFailure(failRepairException(parentRepairSession, "Got negative
replies from endpoints " + failedNodes));
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]