krummas commented on code in PR #2660: URL: https://github.com/apache/cassandra/pull/2660#discussion_r1322427047
########## conf/cassandra.yaml: ########## @@ -723,6 +723,24 @@ memtable_allocation_type: heap_buffers # Min unit: MiB # repair_session_space: +# Configure the retries for each of the repair messages that support it. +# +# For more details see https://issues.apache.org/jira/browse/CASSANDRA-18816 +# +# repair: +# retries: +# type: Exponential +# max_attempts: 3 +# base_sleep_time: 200ms +# max_sleep_time: 1s +# verbs: +# # Increase the timeout of validation responses due to them containing the merkle tree +# VALIDATION_RSP: Review Comment: don't think we should expose verb names in the config ########## src/java/org/apache/cassandra/config/DatabaseDescriptor.java: ########## @@ -1899,6 +1899,11 @@ public static IFailureDetector newFailureDetector() return newFailureDetector.get(); } + public static void setFailureDetector(Supplier<IFailureDetector> fn) Review Comment: why do we need this when we have failureDetector() in SharedContext? ########## src/java/org/apache/cassandra/config/Properties.java: ########## @@ -68,6 +71,35 @@ public static Property andThen(Property root, Property leaf) return andThen(root, leaf, DELIMITER); } + @Nullable + public static Property andThenMap(Property root, String key, String delimiter) Review Comment: I think we can remove all this if we simplify the config if we do need to keep this, we really need some docs on what this method does ########## src/java/org/apache/cassandra/config/DatabaseDescriptor.java: ########## @@ -4906,4 +4917,9 @@ public static DataStorageSpec.IntMebibytesBound getSAISegmentWriteBufferSpace() { return conf.sai_options.segment_write_buffer_size; } + + public static RepairRetrySpec getRepairRetrys() Review Comment: `getRepairRetries`? `getRepairRetrySpec`? ########## src/java/org/apache/cassandra/service/ActiveRepairService.java: ########## @@ -213,25 +224,31 @@ public static ExecutorPlus repairCommandExecutor() return RepairCommandExecutorHandle.repairCommandExecutor; } - private final IFailureDetector failureDetector; - private final Gossiper gossiper; private final Cache<Integer, Pair<ParentRepairStatus, List<String>>> repairStatusByCmd; + public final ExecutorPlus snapshotExecutor; - public final ExecutorPlus snapshotExecutor = executorFactory().configurePooled("RepairSnapshotExecutor", 1) - .withKeepAlive(1, TimeUnit.HOURS) - .build(); + public ActiveRepairService() + { + this(SharedContext.Global.instance, CompactionManager.instance::getPendingTasks); + } - public ActiveRepairService(IFailureDetector failureDetector, Gossiper gossiper) + @VisibleForTesting + public ActiveRepairService(SharedContext ctx, + IntSupplier pendingTasks) Review Comment: maybe move `CompactionManager` in to `SharedContext`? ########## src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java: ########## @@ -172,14 +194,23 @@ public void doVerb(final Message<RepairMessage> message) ColumnFamilyStore store = ColumnFamilyStore.getIfExists(desc.keyspace, desc.columnFamily); if (store == null) { - logger.error("Table {}.{} was dropped during validation phase of repair {}", - desc.keyspace, desc.columnFamily, desc.parentSessionId); - vState.phase.fail(String.format("Table %s.%s was dropped", desc.keyspace, desc.columnFamily)); - MessagingService.instance().send(Message.out(VALIDATION_RSP, new ValidationResponse(desc)), message.from()); + String msg = String.format("Table %s.% was dropped during validation phase of repair %s", desc.keyspace, desc.columnFamily, desc.parentSessionId); + vState.phase.fail(msg); + logErrorAndSendFailureResponse(msg, message); Review Comment: do we need to send both the failure and the empty `VALIDATION_RSP`? Looks like we call `tryFailure` on sending side in both cases ########## src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java: ########## @@ -172,14 +194,23 @@ public void doVerb(final Message<RepairMessage> message) ColumnFamilyStore store = ColumnFamilyStore.getIfExists(desc.keyspace, desc.columnFamily); if (store == null) { - logger.error("Table {}.{} was dropped during validation phase of repair {}", - desc.keyspace, desc.columnFamily, desc.parentSessionId); - vState.phase.fail(String.format("Table %s.%s was dropped", desc.keyspace, desc.columnFamily)); - MessagingService.instance().send(Message.out(VALIDATION_RSP, new ValidationResponse(desc)), message.from()); + String msg = String.format("Table %s.% was dropped during validation phase of repair %s", desc.keyspace, desc.columnFamily, desc.parentSessionId); + vState.phase.fail(msg); + logErrorAndSendFailureResponse(msg, message); + ctx.messaging().send(Message.out(VALIDATION_RSP, new ValidationResponse(desc)), message.from()); return; } - ActiveRepairService.instance.consistent.local.maybeSetRepairing(desc.parentSessionId); + try + { + ctx.repair().consistent.local.maybeSetRepairing(desc.parentSessionId); + } + catch (Throwable t) + { + JVMStabilityInspector.inspectThrowable(t); + logErrorAndSendFailureResponse(t.toString(), message); + ctx.messaging().send(Message.out(VALIDATION_RSP, new ValidationResponse(desc)), message.from()); Review Comment: we should return here ########## src/java/org/apache/cassandra/repair/messages/SyncRequest.java: ########## @@ -69,6 +76,22 @@ public SyncRequest(RepairJobDesc desc, this.asymmetric = asymmetric; } + public UUID deterministicId() Review Comment: maybe we should add a sync request id instead of doing this It would change serialization, but we don't support repair between major versions anyway ########## src/java/org/apache/cassandra/repair/messages/RepairMessage.java: ########## @@ -42,57 +54,210 @@ */ public abstract class RepairMessage { - private static final CassandraVersion SUPPORTS_TIMEOUTS = new CassandraVersion("4.0.7-SNAPSHOT"); + private enum ErrorHandling { NONE, TIMEOUT, RETRY } + private static final CassandraVersion SUPPORTS_RETRY = new CassandraVersion("5.1.0-SNAPSHOT"); + private static final Map<Verb, CassandraVersion> VERB_TIMEOUT_VERSIONS; + + static + { + CassandraVersion timeoutVersion = new CassandraVersion("4.0.7-SNAPSHOT"); + EnumMap<Verb, CassandraVersion> map = new EnumMap<>(Verb.class); + map.put(Verb.VALIDATION_REQ, timeoutVersion); + map.put(Verb.SYNC_REQ, timeoutVersion); + map.put(Verb.VALIDATION_RSP, SUPPORTS_RETRY); + map.put(Verb.SYNC_RSP, SUPPORTS_RETRY); + VERB_TIMEOUT_VERSIONS = Collections.unmodifiableMap(map); + } + private static final Set<Verb> SUPPORTS_RETRY_WITHOUT_VERSION_CHECK = Collections.unmodifiableSet(EnumSet.of(Verb.CLEANUP_MSG)); + private static final Logger logger = LoggerFactory.getLogger(RepairMessage.class); + @Nullable public final RepairJobDesc desc; - protected RepairMessage(RepairJobDesc desc) + protected RepairMessage(@Nullable RepairJobDesc desc) { this.desc = desc; } + public TimeUUID parentRepairSession() + { + return desc.parentSessionId; + } + public interface RepairFailureCallback { void onFailure(Exception e); } - public static void sendMessageWithFailureCB(RepairMessage request, Verb verb, InetAddressAndPort endpoint, RepairFailureCallback failureCallback) + private static Backoff backoff(SharedContext ctx, Verb verb) { - RequestCallback<?> callback = new RequestCallback<Object>() + RepairRetrySpec.Verb configVerb = toConfigVerb(verb); + if (configVerb == null) + return Backoff.None.INSTANCE; + RepairRetrySpec retrySpec = DatabaseDescriptor.getRepairRetrys(); + RetrySpec spec = retrySpec.get(configVerb); + if (!spec.isEnabled()) + return Backoff.None.INSTANCE; + switch (spec.type) + { + case Exponential: + return new Backoff.ExponentialBackoff(spec.maxAttempts.value, spec.baseSleepTime.toMilliseconds(), spec.maxSleepTime.toMilliseconds(), ctx.random().get()::nextDouble); + default: + throw new IllegalArgumentException("Unknown type: " + spec.type); + } + } + + @Nullable + private static RepairRetrySpec.Verb toConfigVerb(Verb verb) + { + switch (verb) + { + case PREPARE_MSG: return RepairRetrySpec.Verb.PREPARE; + case VALIDATION_REQ: return RepairRetrySpec.Verb.VALIDATION_REQ; + case VALIDATION_RSP: return RepairRetrySpec.Verb.VALIDATION_RSP; + case SYNC_REQ: return RepairRetrySpec.Verb.SYNC_REQ; + case SYNC_RSP: return RepairRetrySpec.Verb.SYNC_RSP; + case SNAPSHOT_MSG: return RepairRetrySpec.Verb.SNAPSHOT; + case CLEANUP_MSG: return RepairRetrySpec.Verb.CLEANUP; + default: return null; + } + } + + public interface AllowRetry + { + boolean test(InetAddressAndPort from, RequestFailureReason failureReason, int attempt); + } + + public static AllowRetry notDone(Future<?> f) + { + return (i1, i2, i3) -> !f.isDone(); + } + + private static AllowRetry always() + { + return (i1, i2, i3) -> true; + } + + public static <T> void sendMessageWithRetries(SharedContext ctx, AllowRetry allowRetry, RepairMessage request, Verb verb, InetAddressAndPort endpoint, RequestCallback<T> finalCallback) + { + sendMessageWithRetries(ctx, backoff(ctx, verb), allowRetry, request, verb, endpoint, finalCallback, 0); + } + + public static <T> void sendMessageWithRetries(SharedContext ctx, RepairMessage request, Verb verb, InetAddressAndPort endpoint, RequestCallback<T> finalCallback) + { + sendMessageWithRetries(ctx, backoff(ctx, verb), always(), request, verb, endpoint, finalCallback, 0); + } + + public static <T> void sendMessageWithRetries(SharedContext ctx, RepairMessage request, Verb verb, InetAddressAndPort endpoint) Review Comment: `<T>` is unused ########## src/java/org/apache/cassandra/service/ActiveRepairService.java: ########## @@ -678,22 +670,60 @@ public boolean invokeOnFailure() } } } - try + // implement timeout to bound the runtime of the future + long timeoutMillis = getRepairRetrys().isEnabled(RepairRetrySpec.Verb.PREPARE) ? getRepairRpcTimeout(MILLISECONDS) + : getRpcTimeout(MILLISECONDS); + ctx.optionalTasks().schedule(() -> { + if (promise.isDone()) + return; + String errorMsg = "Did not get replies from all endpoints."; + if (promise.tryFailure(new RuntimeException(errorMsg))) + participateFailed(parentRepairSession, errorMsg); + }, timeoutMillis, MILLISECONDS); + + return promise; + } + + private void sendPrepareWithRetries(TimeUUID parentRepairSession, + AtomicInteger pending, + Set<String> failedNodes, + AsyncPromise<Void> promise, + InetAddressAndPort to, + RepairMessage msg) + { + RepairMessage.sendMessageWithRetries(ctx, notDone(promise), msg, PREPARE_MSG, to, new RequestCallback<>() { - if (!prepareLatch.await(getRpcTimeout(MILLISECONDS), MILLISECONDS) || timeouts.get() > 0) - failRepair(parentRepairSession, "Did not get replies from all endpoints."); - } - catch (InterruptedException e) - { - failRepair(parentRepairSession, "Interrupted while waiting for prepare repair response."); - } + @Override + public void onResponse(Message<Object> msg) + { + ack(); + } - if (!status.get()) - { - failRepair(parentRepairSession, "Got negative replies from endpoints " + failedNodes); - } + @Override + public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason) + { + failedNodes.add(from.toString()); + if (failureReason == RequestFailureReason.TIMEOUT) + { + pending.set(-1); + promise.setFailure(failRepairException(parentRepairSession, "Did not get replies from all endpoints.")); + } + else + { + ack(); + } + } + + private void ack() + { + if (pending.decrementAndGet() == 0) + { + if (failedNodes.isEmpty()) promise.setSuccess(null); Review Comment: code style ########## src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java: ########## @@ -172,14 +194,23 @@ public void doVerb(final Message<RepairMessage> message) ColumnFamilyStore store = ColumnFamilyStore.getIfExists(desc.keyspace, desc.columnFamily); if (store == null) { - logger.error("Table {}.{} was dropped during validation phase of repair {}", - desc.keyspace, desc.columnFamily, desc.parentSessionId); - vState.phase.fail(String.format("Table %s.%s was dropped", desc.keyspace, desc.columnFamily)); - MessagingService.instance().send(Message.out(VALIDATION_RSP, new ValidationResponse(desc)), message.from()); + String msg = String.format("Table %s.% was dropped during validation phase of repair %s", desc.keyspace, desc.columnFamily, desc.parentSessionId); + vState.phase.fail(msg); + logErrorAndSendFailureResponse(msg, message); Review Comment: should we send these `VALIDATION_RSP` messages with retries as well? ########## src/java/org/apache/cassandra/repair/messages/RepairMessage.java: ########## @@ -42,57 +54,210 @@ */ public abstract class RepairMessage { - private static final CassandraVersion SUPPORTS_TIMEOUTS = new CassandraVersion("4.0.7-SNAPSHOT"); + private enum ErrorHandling { NONE, TIMEOUT, RETRY } + private static final CassandraVersion SUPPORTS_RETRY = new CassandraVersion("5.1.0-SNAPSHOT"); + private static final Map<Verb, CassandraVersion> VERB_TIMEOUT_VERSIONS; + + static + { + CassandraVersion timeoutVersion = new CassandraVersion("4.0.7-SNAPSHOT"); + EnumMap<Verb, CassandraVersion> map = new EnumMap<>(Verb.class); + map.put(Verb.VALIDATION_REQ, timeoutVersion); + map.put(Verb.SYNC_REQ, timeoutVersion); + map.put(Verb.VALIDATION_RSP, SUPPORTS_RETRY); + map.put(Verb.SYNC_RSP, SUPPORTS_RETRY); + VERB_TIMEOUT_VERSIONS = Collections.unmodifiableMap(map); + } + private static final Set<Verb> SUPPORTS_RETRY_WITHOUT_VERSION_CHECK = Collections.unmodifiableSet(EnumSet.of(Verb.CLEANUP_MSG)); + private static final Logger logger = LoggerFactory.getLogger(RepairMessage.class); + @Nullable public final RepairJobDesc desc; - protected RepairMessage(RepairJobDesc desc) + protected RepairMessage(@Nullable RepairJobDesc desc) { this.desc = desc; } + public TimeUUID parentRepairSession() + { + return desc.parentSessionId; + } + public interface RepairFailureCallback { void onFailure(Exception e); } - public static void sendMessageWithFailureCB(RepairMessage request, Verb verb, InetAddressAndPort endpoint, RepairFailureCallback failureCallback) + private static Backoff backoff(SharedContext ctx, Verb verb) { - RequestCallback<?> callback = new RequestCallback<Object>() + RepairRetrySpec.Verb configVerb = toConfigVerb(verb); + if (configVerb == null) + return Backoff.None.INSTANCE; + RepairRetrySpec retrySpec = DatabaseDescriptor.getRepairRetrys(); + RetrySpec spec = retrySpec.get(configVerb); + if (!spec.isEnabled()) + return Backoff.None.INSTANCE; + switch (spec.type) + { + case Exponential: + return new Backoff.ExponentialBackoff(spec.maxAttempts.value, spec.baseSleepTime.toMilliseconds(), spec.maxSleepTime.toMilliseconds(), ctx.random().get()::nextDouble); + default: + throw new IllegalArgumentException("Unknown type: " + spec.type); + } + } + + @Nullable + private static RepairRetrySpec.Verb toConfigVerb(Verb verb) + { + switch (verb) + { + case PREPARE_MSG: return RepairRetrySpec.Verb.PREPARE; + case VALIDATION_REQ: return RepairRetrySpec.Verb.VALIDATION_REQ; + case VALIDATION_RSP: return RepairRetrySpec.Verb.VALIDATION_RSP; + case SYNC_REQ: return RepairRetrySpec.Verb.SYNC_REQ; + case SYNC_RSP: return RepairRetrySpec.Verb.SYNC_RSP; + case SNAPSHOT_MSG: return RepairRetrySpec.Verb.SNAPSHOT; + case CLEANUP_MSG: return RepairRetrySpec.Verb.CLEANUP; + default: return null; + } + } + + public interface AllowRetry + { + boolean test(InetAddressAndPort from, RequestFailureReason failureReason, int attempt); Review Comment: these parameters are not used anywhere ########## src/java/org/apache/cassandra/config/RetrySpec.java: ########## @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.config; + +import java.util.Objects; + +import javax.annotation.Nullable; + +import org.apache.cassandra.config.DurationSpec.LongMillisecondsBound; + +public class RetrySpec +{ + public enum Type { Exponential } Review Comment: only one Type, we should not abstract this until we actually need to. I understand that the end goal might be to have more than one type, but to simplify this patch we should not add abstractions we don't use. If we want to refactor all different kinds of retries we have in the db in the future, that should be done in a separate patch ########## src/java/org/apache/cassandra/db/repair/CassandraTableRepairManager.java: ########## @@ -41,16 +42,24 @@ public class CassandraTableRepairManager implements TableRepairManager { private final ColumnFamilyStore cfs; + private final SharedContext ctx; public CassandraTableRepairManager(ColumnFamilyStore cfs) { this.cfs = cfs; + this.ctx = SharedContext.Global.instance; Review Comment: `this(cfs, SharedContext.Global.instance);` ########## src/java/org/apache/cassandra/config/RepairRetrySpec.java: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.config; + +import java.util.EnumMap; +import java.util.Map; + +public class RepairRetrySpec extends RetrySpec Review Comment: We only use this for repair, can remove this abstraction? ########## src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java: ########## @@ -172,14 +194,23 @@ public void doVerb(final Message<RepairMessage> message) ColumnFamilyStore store = ColumnFamilyStore.getIfExists(desc.keyspace, desc.columnFamily); if (store == null) { - logger.error("Table {}.{} was dropped during validation phase of repair {}", - desc.keyspace, desc.columnFamily, desc.parentSessionId); - vState.phase.fail(String.format("Table %s.%s was dropped", desc.keyspace, desc.columnFamily)); - MessagingService.instance().send(Message.out(VALIDATION_RSP, new ValidationResponse(desc)), message.from()); + String msg = String.format("Table %s.% was dropped during validation phase of repair %s", desc.keyspace, desc.columnFamily, desc.parentSessionId); Review Comment: `%s.%` should be `%s.%s` ########## conf/cassandra.yaml: ########## @@ -723,6 +723,24 @@ memtable_allocation_type: heap_buffers # Min unit: MiB # repair_session_space: +# Configure the retries for each of the repair messages that support it. +# +# For more details see https://issues.apache.org/jira/browse/CASSANDRA-18816 +# +# repair: +# retries: +# type: Exponential +# max_attempts: 3 +# base_sleep_time: 200ms +# max_sleep_time: 1s +# verbs: +# # Increase the timeout of validation responses due to them containing the merkle tree +# VALIDATION_RSP: Review Comment: In general I doubt we need to be able to override attempts/sleep times etc on a per-message basis - I think we can retry equally for each message, with the exception for VALIDATION_RSP which obviously might need more time. so basically something like (not sure what is more obvious to users, `merkle_tree_...` or `validation_...`): ``` repair: max_attempts: 3 base_sleep_time: 200ms max_sleep_time: 1s merkle_tree_response_max_attempts: 3 ... ``` ########## src/java/org/apache/cassandra/service/ActiveRepairService.java: ########## @@ -150,28 +144,43 @@ @Simulate(with = MONITORS) public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener, ActiveRepairServiceMBean { + public enum ParentRepairStatus { IN_PROGRESS, COMPLETED, FAILED } - public static class ConsistentSessions + public class ConsistentSessions Review Comment: this can still be static ########## src/java/org/apache/cassandra/service/ActiveRepairService.java: ########## @@ -614,62 +632,36 @@ public static boolean verifyCompactionsPendingThreshold(TimeUUID parentRepairSes return true; } - public TimeUUID prepareForRepair(TimeUUID parentRepairSession, InetAddressAndPort coordinator, Set<InetAddressAndPort> endpoints, RepairOption options, boolean isForcedRepair, List<ColumnFamilyStore> columnFamilyStores) + public Future<?> prepareForRepair(TimeUUID parentRepairSession, InetAddressAndPort coordinator, Set<InetAddressAndPort> endpoints, RepairOption options, boolean isForcedRepair, List<ColumnFamilyStore> columnFamilyStores) { if (!verifyCompactionsPendingThreshold(parentRepairSession, options.getPreviewKind())) failRepair(parentRepairSession, "Rejecting incoming repair, pending compactions above threshold"); // failRepair throws exception long repairedAt = getRepairedAt(options, isForcedRepair); registerParentRepairSession(parentRepairSession, coordinator, columnFamilyStores, options.getRanges(), options.isIncremental(), repairedAt, options.isGlobal(), options.getPreviewKind()); - final CountDownLatch prepareLatch = newCountDownLatch(endpoints.size()); - final AtomicBoolean status = new AtomicBoolean(true); - final Set<String> failedNodes = synchronizedSet(new HashSet<String>()); - final AtomicInteger timeouts = new AtomicInteger(0); - RequestCallback callback = new RequestCallback() - { - @Override - public void onResponse(Message msg) - { - prepareLatch.decrement(); - } - - @Override - public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason) - { - status.set(false); - failedNodes.add(from.toString()); - if (failureReason == RequestFailureReason.TIMEOUT) - timeouts.incrementAndGet(); - prepareLatch.decrement(); - } - - @Override - public boolean invokeOnFailure() - { - return true; - } - }; + final AtomicInteger pending = new AtomicInteger(endpoints.size()); Review Comment: we can drop the `final` keyword on these ########## src/java/org/apache/cassandra/repair/messages/RepairMessage.java: ########## @@ -42,57 +54,210 @@ */ public abstract class RepairMessage { - private static final CassandraVersion SUPPORTS_TIMEOUTS = new CassandraVersion("4.0.7-SNAPSHOT"); + private enum ErrorHandling { NONE, TIMEOUT, RETRY } + private static final CassandraVersion SUPPORTS_RETRY = new CassandraVersion("5.1.0-SNAPSHOT"); + private static final Map<Verb, CassandraVersion> VERB_TIMEOUT_VERSIONS; + + static + { + CassandraVersion timeoutVersion = new CassandraVersion("4.0.7-SNAPSHOT"); + EnumMap<Verb, CassandraVersion> map = new EnumMap<>(Verb.class); + map.put(Verb.VALIDATION_REQ, timeoutVersion); + map.put(Verb.SYNC_REQ, timeoutVersion); + map.put(Verb.VALIDATION_RSP, SUPPORTS_RETRY); + map.put(Verb.SYNC_RSP, SUPPORTS_RETRY); + VERB_TIMEOUT_VERSIONS = Collections.unmodifiableMap(map); + } + private static final Set<Verb> SUPPORTS_RETRY_WITHOUT_VERSION_CHECK = Collections.unmodifiableSet(EnumSet.of(Verb.CLEANUP_MSG)); + private static final Logger logger = LoggerFactory.getLogger(RepairMessage.class); + @Nullable public final RepairJobDesc desc; - protected RepairMessage(RepairJobDesc desc) + protected RepairMessage(@Nullable RepairJobDesc desc) { this.desc = desc; } + public TimeUUID parentRepairSession() + { + return desc.parentSessionId; + } + public interface RepairFailureCallback { void onFailure(Exception e); } - public static void sendMessageWithFailureCB(RepairMessage request, Verb verb, InetAddressAndPort endpoint, RepairFailureCallback failureCallback) + private static Backoff backoff(SharedContext ctx, Verb verb) { - RequestCallback<?> callback = new RequestCallback<Object>() + RepairRetrySpec.Verb configVerb = toConfigVerb(verb); + if (configVerb == null) + return Backoff.None.INSTANCE; + RepairRetrySpec retrySpec = DatabaseDescriptor.getRepairRetrys(); + RetrySpec spec = retrySpec.get(configVerb); + if (!spec.isEnabled()) + return Backoff.None.INSTANCE; + switch (spec.type) + { + case Exponential: + return new Backoff.ExponentialBackoff(spec.maxAttempts.value, spec.baseSleepTime.toMilliseconds(), spec.maxSleepTime.toMilliseconds(), ctx.random().get()::nextDouble); + default: + throw new IllegalArgumentException("Unknown type: " + spec.type); + } + } + + @Nullable + private static RepairRetrySpec.Verb toConfigVerb(Verb verb) + { + switch (verb) + { + case PREPARE_MSG: return RepairRetrySpec.Verb.PREPARE; + case VALIDATION_REQ: return RepairRetrySpec.Verb.VALIDATION_REQ; + case VALIDATION_RSP: return RepairRetrySpec.Verb.VALIDATION_RSP; + case SYNC_REQ: return RepairRetrySpec.Verb.SYNC_REQ; + case SYNC_RSP: return RepairRetrySpec.Verb.SYNC_RSP; + case SNAPSHOT_MSG: return RepairRetrySpec.Verb.SNAPSHOT; + case CLEANUP_MSG: return RepairRetrySpec.Verb.CLEANUP; + default: return null; + } + } + + public interface AllowRetry + { + boolean test(InetAddressAndPort from, RequestFailureReason failureReason, int attempt); + } + + public static AllowRetry notDone(Future<?> f) + { + return (i1, i2, i3) -> !f.isDone(); + } + + private static AllowRetry always() + { + return (i1, i2, i3) -> true; + } + + public static <T> void sendMessageWithRetries(SharedContext ctx, AllowRetry allowRetry, RepairMessage request, Verb verb, InetAddressAndPort endpoint, RequestCallback<T> finalCallback) + { + sendMessageWithRetries(ctx, backoff(ctx, verb), allowRetry, request, verb, endpoint, finalCallback, 0); + } + + public static <T> void sendMessageWithRetries(SharedContext ctx, RepairMessage request, Verb verb, InetAddressAndPort endpoint, RequestCallback<T> finalCallback) + { + sendMessageWithRetries(ctx, backoff(ctx, verb), always(), request, verb, endpoint, finalCallback, 0); + } + + public static <T> void sendMessageWithRetries(SharedContext ctx, RepairMessage request, Verb verb, InetAddressAndPort endpoint) + { + sendMessageWithRetries(ctx, backoff(ctx, verb), always(), request, verb, endpoint, new RequestCallback<>() { @Override public void onResponse(Message<Object> msg) { - logger.info("[#{}] {} received by {}", request.desc.parentSessionId, verb, endpoint); - // todo: at some point we should make repair messages follow the normal path, actually using this + } + + @Override + public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason) + { + } + }, 0); + } + + private static <T> void sendMessageWithRetries(SharedContext ctx, Backoff backoff, AllowRetry allowRetry, RepairMessage request, Verb verb, InetAddressAndPort endpoint, RequestCallback<T> finalCallback, int attempt) + { + RequestCallback<T> callback = new RequestCallback<>() + { + @Override + public void onResponse(Message<T> msg) + { + finalCallback.onResponse(msg); + } + + @Override + public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason) + { + ErrorHandling allowed = errorHandlingSupported(ctx, endpoint, verb, request.parentRepairSession()); + switch (allowed) + { + case NONE: + logger.error("[#{}] {} failed on {}: {}", request.parentRepairSession(), verb, from, failureReason); + return; + case TIMEOUT: + finalCallback.onFailure(from, failureReason); + return; + case RETRY: + int maxAttempts = backoff.maxAttempts(); + if (failureReason == RequestFailureReason.TIMEOUT && attempt < maxAttempts && allowRetry.test(from, failureReason, attempt)) + { + ctx.optionalTasks().schedule(() -> sendMessageWithRetries(ctx, backoff, allowRetry, request, verb, endpoint, finalCallback, attempt + 1), + backoff.computeWaitTime(attempt), backoff.unit()); + return; + } + finalCallback.onFailure(from, failureReason); + return; + default: + throw new AssertionError("Unknown error handler: " + allowed); + } } @Override public boolean invokeOnFailure() { return true; } + }; + ctx.messaging().sendWithCallback(Message.outWithFlag(verb, request, CALL_BACK_ON_FAILURE), + endpoint, + callback); + } + public static void sendMessageWithFailureCB(SharedContext ctx, AllowRetry allowRetry, RepairMessage request, Verb verb, InetAddressAndPort endpoint, RepairFailureCallback failureCallback) + { + RequestCallback<?> callback = new RequestCallback<>() + { + @Override + public void onResponse(Message<Object> msg) + { + logger.info("[#{}] {} received by {}", request.parentRepairSession(), verb, endpoint); + // todo: at some point we should make repair messages follow the normal path, actually using this + } + + @Override public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason) { - logger.error("[#{}] {} failed on {}: {}", request.desc.parentSessionId, verb, from, failureReason); + failureCallback.onFailure(RepairException.error(request.desc, PreviewKind.NONE, String.format("Got %s failure from %s: %s", verb, from, failureReason))); + } - if (supportsTimeouts(from, request.desc.parentSessionId)) - failureCallback.onFailure(RepairException.error(request.desc, PreviewKind.NONE, String.format("Got %s failure from %s: %s", verb, from, failureReason))); + @Override + public boolean invokeOnFailure() + { + return true; } }; - - MessagingService.instance().sendWithCallback(Message.outWithFlag(verb, request, CALL_BACK_ON_FAILURE), - endpoint, - callback); + sendMessageWithRetries(ctx, allowRetry, request, verb, endpoint, callback); } - private static boolean supportsTimeouts(InetAddressAndPort from, TimeUUID parentSessionId) + private static ErrorHandling errorHandlingSupported(SharedContext ctx, InetAddressAndPort from, Verb verb, TimeUUID parentSessionId) { - CassandraVersion remoteVersion = Gossiper.instance.getReleaseVersion(from); - if (remoteVersion != null && remoteVersion.compareTo(SUPPORTS_TIMEOUTS) >= 0) - return true; - logger.warn("[#{}] Not failing repair due to remote host {} not supporting repair message timeouts (version = {})", parentSessionId, from, remoteVersion); - return false; + if (SUPPORTS_RETRY_WITHOUT_VERSION_CHECK.contains(verb)) + return ErrorHandling.RETRY; + // Repair in mixed mode isn't fully supported, but also not activally blocked... so in the common case all participants + // will be on the same version as this instance, so can avoid the lookup from gossip + CassandraVersion remoteVersion = ctx.gossiper().getReleaseVersion(from); + if (remoteVersion == null) + { + if (VERB_TIMEOUT_VERSIONS.containsKey(verb)) + { + logger.warn("[#{}] Not failing repair due to remote host {} not supporting repair message timeouts (version = {})", parentSessionId, from, remoteVersion); Review Comment: `remoteVersion` is always null here ########## src/java/org/apache/cassandra/service/StorageServiceMBean.java: ########## @@ -477,7 +478,7 @@ default int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, /** * Get the status of a given parent repair session. * @param cmd the int reference returned when issuing the repair - * @return status of parent repair from enum {@link org.apache.cassandra.repair.RepairRunnable.Status} + * @return status of parent repair from enum {@link RepairCoordinator.Status} Review Comment: this should be `ParentRepairStatus` ########## src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java: ########## @@ -172,14 +194,23 @@ public void doVerb(final Message<RepairMessage> message) ColumnFamilyStore store = ColumnFamilyStore.getIfExists(desc.keyspace, desc.columnFamily); if (store == null) { - logger.error("Table {}.{} was dropped during validation phase of repair {}", - desc.keyspace, desc.columnFamily, desc.parentSessionId); - vState.phase.fail(String.format("Table %s.%s was dropped", desc.keyspace, desc.columnFamily)); - MessagingService.instance().send(Message.out(VALIDATION_RSP, new ValidationResponse(desc)), message.from()); + String msg = String.format("Table %s.% was dropped during validation phase of repair %s", desc.keyspace, desc.columnFamily, desc.parentSessionId); + vState.phase.fail(msg); + logErrorAndSendFailureResponse(msg, message); + ctx.messaging().send(Message.out(VALIDATION_RSP, new ValidationResponse(desc)), message.from()); return; } - ActiveRepairService.instance.consistent.local.maybeSetRepairing(desc.parentSessionId); + try + { + ctx.repair().consistent.local.maybeSetRepairing(desc.parentSessionId); + } + catch (Throwable t) + { + JVMStabilityInspector.inspectThrowable(t); + logErrorAndSendFailureResponse(t.toString(), message); + ctx.messaging().send(Message.out(VALIDATION_RSP, new ValidationResponse(desc)), message.from()); Review Comment: also sending both failure and empty VALIDATION_RSP, if its needed we probably need a comment where we do this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

