maedhroz commented on code in PR #2732:
URL: https://github.com/apache/cassandra/pull/2732#discussion_r1337828415
##########
src/java/org/apache/cassandra/repair/messages/RepairMessage.java:
##########
@@ -42,57 +55,182 @@
*/
public abstract class RepairMessage
{
- private static final CassandraVersion SUPPORTS_TIMEOUTS = new
CassandraVersion("4.0.7-SNAPSHOT");
+ private enum ErrorHandling { NONE, TIMEOUT, RETRY }
+ private static final CassandraVersion SUPPORTS_RETRY = new
CassandraVersion("5.0.0-alpha2.SNAPSHOT");
+ private static final Map<Verb, CassandraVersion> VERB_TIMEOUT_VERSIONS;
+
+ static
+ {
+ CassandraVersion timeoutVersion = new
CassandraVersion("4.0.7-SNAPSHOT");
+ EnumMap<Verb, CassandraVersion> map = new EnumMap<>(Verb.class);
+ map.put(Verb.VALIDATION_REQ, timeoutVersion);
+ map.put(Verb.SYNC_REQ, timeoutVersion);
+ map.put(Verb.VALIDATION_RSP, SUPPORTS_RETRY);
+ map.put(Verb.SYNC_RSP, SUPPORTS_RETRY);
+ VERB_TIMEOUT_VERSIONS = Collections.unmodifiableMap(map);
+ }
+ private static final Set<Verb> SUPPORTS_RETRY_WITHOUT_VERSION_CHECK =
Collections.unmodifiableSet(EnumSet.of(Verb.CLEANUP_MSG));
+
private static final Logger logger =
LoggerFactory.getLogger(RepairMessage.class);
+ @Nullable
public final RepairJobDesc desc;
- protected RepairMessage(RepairJobDesc desc)
+ protected RepairMessage(@Nullable RepairJobDesc desc)
{
this.desc = desc;
}
+ public TimeUUID parentRepairSession()
+ {
+ return desc.parentSessionId;
+ }
+
public interface RepairFailureCallback
{
void onFailure(Exception e);
}
- public static void sendMessageWithFailureCB(RepairMessage request, Verb
verb, InetAddressAndPort endpoint, RepairFailureCallback failureCallback)
+ private static Backoff backoff(SharedContext ctx, Verb verb)
+ {
+ RepairRetrySpec retrySpec = DatabaseDescriptor.getRepairRetrySpec();
+ RetrySpec spec = verb == Verb.VALIDATION_RSP ?
retrySpec.getMerkleTreeResponseSpec() : retrySpec;
+ if (!spec.isEnabled())
+ return Backoff.None.INSTANCE;
+ return new Backoff.ExponentialBackoff(spec.maxAttempts.value,
spec.baseSleepTime.toMilliseconds(), spec.maxSleepTime.toMilliseconds(),
ctx.random().get()::nextDouble);
+ }
+
+ public static Supplier<Boolean> notDone(Future<?> f)
{
- RequestCallback<?> callback = new RequestCallback<Object>()
+ return () -> !f.isDone();
+ }
+
+ private static Supplier<Boolean> always()
+ {
+ return () -> true;
+ }
+
+ public static <T> void sendMessageWithRetries(SharedContext ctx,
Supplier<Boolean> allowRetry, RepairMessage request, Verb verb,
InetAddressAndPort endpoint, RequestCallback<T> finalCallback)
+ {
+ sendMessageWithRetries(ctx, backoff(ctx, verb), allowRetry, request,
verb, endpoint, finalCallback, 0);
+ }
+
+ public static <T> void sendMessageWithRetries(SharedContext ctx,
RepairMessage request, Verb verb, InetAddressAndPort endpoint,
RequestCallback<T> finalCallback)
+ {
+ sendMessageWithRetries(ctx, backoff(ctx, verb), always(), request,
verb, endpoint, finalCallback, 0);
+ }
+
+ public static void sendMessageWithRetries(SharedContext ctx, RepairMessage
request, Verb verb, InetAddressAndPort endpoint)
+ {
+ sendMessageWithRetries(ctx, backoff(ctx, verb), always(), request,
verb, endpoint, new RequestCallback<>()
{
@Override
public void onResponse(Message<Object> msg)
{
- logger.info("[#{}] {} received by {}",
request.desc.parentSessionId, verb, endpoint);
- // todo: at some point we should make repair messages follow
the normal path, actually using this
+ }
+
+ @Override
+ public void onFailure(InetAddressAndPort from,
RequestFailureReason failureReason)
+ {
+ }
+ }, 0);
+ }
+
+ private static <T> void sendMessageWithRetries(SharedContext ctx, Backoff
backoff, Supplier<Boolean> allowRetry, RepairMessage request, Verb verb,
InetAddressAndPort endpoint, RequestCallback<T> finalCallback, int attempt)
+ {
+ RequestCallback<T> callback = new RequestCallback<>()
+ {
+ @Override
+ public void onResponse(Message<T> msg)
+ {
+ finalCallback.onResponse(msg);
+ }
+
+ @Override
+ public void onFailure(InetAddressAndPort from,
RequestFailureReason failureReason)
+ {
+ ErrorHandling allowed = errorHandlingSupported(ctx, endpoint,
verb, request.parentRepairSession());
+ switch (allowed)
+ {
+ case NONE:
+ logger.error("[#{}] {} failed on {}: {}",
request.parentRepairSession(), verb, from, failureReason);
+ return;
+ case TIMEOUT:
+ finalCallback.onFailure(from, failureReason);
+ return;
+ case RETRY:
+ int maxAttempts = backoff.maxAttempts();
+ if (failureReason == RequestFailureReason.TIMEOUT &&
attempt < maxAttempts && allowRetry.get())
+ {
+ ctx.optionalTasks().schedule(() ->
sendMessageWithRetries(ctx, backoff, allowRetry, request, verb, endpoint,
finalCallback, attempt + 1),
+
backoff.computeWaitTime(attempt), backoff.unit());
+ return;
+ }
+ finalCallback.onFailure(from, failureReason);
+ return;
+ default:
+ throw new AssertionError("Unknown error handler: " +
allowed);
+ }
}
@Override
public boolean invokeOnFailure()
{
return true;
}
+ };
+ ctx.messaging().sendWithCallback(Message.outWithFlag(verb, request,
CALL_BACK_ON_FAILURE),
+ endpoint,
+ callback);
+ }
+ public static void sendMessageWithFailureCB(SharedContext ctx,
Supplier<Boolean> allowRetry, RepairMessage request, Verb verb,
InetAddressAndPort endpoint, RepairFailureCallback failureCallback)
+ {
+ RequestCallback<?> callback = new RequestCallback<>()
+ {
+ @Override
+ public void onResponse(Message<Object> msg)
+ {
+ logger.info("[#{}] {} received by {}",
request.parentRepairSession(), verb, endpoint);
+ // todo: at some point we should make repair messages follow
the normal path, actually using this
+ }
+
+ @Override
public void onFailure(InetAddressAndPort from,
RequestFailureReason failureReason)
{
- logger.error("[#{}] {} failed on {}: {}",
request.desc.parentSessionId, verb, from, failureReason);
+ failureCallback.onFailure(RepairException.error(request.desc,
PreviewKind.NONE, String.format("Got %s failure from %s: %s", verb, from,
failureReason)));
+ }
- if (supportsTimeouts(from, request.desc.parentSessionId))
-
failureCallback.onFailure(RepairException.error(request.desc, PreviewKind.NONE,
String.format("Got %s failure from %s: %s", verb, from, failureReason)));
+ @Override
+ public boolean invokeOnFailure()
+ {
+ return true;
}
};
-
- MessagingService.instance().sendWithCallback(Message.outWithFlag(verb,
request, CALL_BACK_ON_FAILURE),
- endpoint,
- callback);
+ sendMessageWithRetries(ctx, allowRetry, request, verb, endpoint,
callback);
}
- private static boolean supportsTimeouts(InetAddressAndPort from, TimeUUID
parentSessionId)
+ private static ErrorHandling errorHandlingSupported(SharedContext ctx,
InetAddressAndPort from, Verb verb, TimeUUID parentSessionId)
{
- CassandraVersion remoteVersion =
Gossiper.instance.getReleaseVersion(from);
- if (remoteVersion != null &&
remoteVersion.compareTo(SUPPORTS_TIMEOUTS) >= 0)
- return true;
- logger.warn("[#{}] Not failing repair due to remote host {} not
supporting repair message timeouts (version = {})", parentSessionId, from,
remoteVersion);
- return false;
+ if (SUPPORTS_RETRY_WITHOUT_VERSION_CHECK.contains(verb))
+ return ErrorHandling.RETRY;
+ // Repair in mixed mode isn't fully supported, but also not activally
blocked... so in the common case all participants
+ // will be on the same version as this instance, so can avoid the
lookup from gossip
+ CassandraVersion remoteVersion =
ctx.gossiper().getReleaseVersion(from);
+ if (remoteVersion == null)
+ {
+ if (VERB_TIMEOUT_VERSIONS.containsKey(verb))
+ {
+ logger.warn("[#{}] Not failing repair due to remote host {}
not supporting repair message timeouts (version is unknown)", parentSessionId,
from);
+ return ErrorHandling.NONE;
+ }
+ return ErrorHandling.TIMEOUT;
+ }
+ if (remoteVersion.compareTo(SUPPORTS_RETRY) >= 0)
+ return ErrorHandling.RETRY;
+ CassandraVersion timeoutVersion = VERB_TIMEOUT_VERSIONS.get(verb);
+ if (timeoutVersion == null ||
+ remoteVersion.compareTo(timeoutVersion) >= 0)
Review Comment:
nit: Might be nice for this to be on one line. I have to do a double-take w/
the `return` having the same indentation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]