jmckenzie-dev commented on code in PR #4030: URL: https://github.com/apache/cassandra/pull/4030#discussion_r2025510382
########## src/java/org/apache/cassandra/service/WriteResponseHandler.java: ########## @@ -17,58 +17,375 @@ */ package org.apache.cassandra.service; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; +import javax.annotation.Nullable; -import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.locator.ReplicaPlan; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.net.Message; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.IMutation; +import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.WriteType; -import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.exceptions.WriteFailureException; +import org.apache.cassandra.exceptions.WriteTimeoutException; +import org.apache.cassandra.locator.EndpointsForToken; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.ReplicaPlan; +import org.apache.cassandra.locator.ReplicaPlan.ForWrite; +import org.apache.cassandra.net.CallbackResponseTracker; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.RequestCallback; +import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.transport.Dispatcher; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.concurrent.Condition; +import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; -/** - * Handles blocking writes for ONE, ANY, TWO, THREE, QUORUM, and ALL consistency levels. - */ -public class WriteResponseHandler<T> extends AbstractWriteResponseHandler<T> +import static java.lang.Long.MAX_VALUE; +import static java.lang.Math.min; +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static org.apache.cassandra.config.DatabaseDescriptor.getCounterWriteRpcTimeout; +import static org.apache.cassandra.config.DatabaseDescriptor.getWriteRpcTimeout; +import static org.apache.cassandra.db.WriteType.COUNTER; +import static org.apache.cassandra.locator.Replicas.countInOurDc; +import static org.apache.cassandra.schema.Schema.instance; +import static org.apache.cassandra.service.StorageProxy.WritePerformer; +import static org.apache.cassandra.utils.Clock.Global.nanoTime; +import static org.apache.cassandra.utils.concurrent.Condition.newOneTimeCondition; + +@SuppressWarnings({ "rawtypes", "unchecked" }) +public class WriteResponseHandler<T> implements RequestCallback<T> { protected static final Logger logger = LoggerFactory.getLogger(WriteResponseHandler.class); - protected volatile int responses; - private static final AtomicIntegerFieldUpdater<WriteResponseHandler> responsesUpdater - = AtomicIntegerFieldUpdater.newUpdater(WriteResponseHandler.class, "responses"); + private final Condition condition = newOneTimeCondition(); + protected final ReplicaPlan.ForWrite replicaPlan; + + protected final Runnable callback; + protected final WriteType writeType; - public WriteResponseHandler(ReplicaPlan.ForWrite replicaPlan, - Runnable callback, - WriteType writeType, - Supplier<Mutation> hintOnFailure, - Dispatcher.RequestTime requestTime) + private static final AtomicIntegerFieldUpdater<WriteResponseHandler> failuresUpdater = + AtomicIntegerFieldUpdater.newUpdater(WriteResponseHandler.class, "failureCount"); + private volatile int failureCount = 0; + + private final AtomicBoolean recordedMetric = new AtomicBoolean(false); + + protected final Dispatcher.RequestTime requestTime; + protected final CallbackResponseTracker tracker; + private @Nullable final Supplier<Mutation> hintOnFailure; + + /** + * The count of all the replicas we'd need to hit across DC's to qualify as hitting our "ideal" + * + * Of note, this isn't a calculation of what would qualify as a quorum across all DC's, i.e. "EACH_QUORUM", but rather + * a brute force calculation of what would equate to CL_ALL. This is how we originally calculated things on implementation + * in CASSANDRA-13289 so we continue with that. It would be relatively trivial to extend this logic to include + * a per-DC calculation similar to how we track different DC responses in {@link EachQuorumResponseHandler#pendingResponsesPerDC} + */ + @SuppressWarnings("JavadocReference") + protected final int idealCLReplicaCount; + + public WriteResponseHandler(ForWrite replicaPlan, + WriteType writeType, + Supplier<Mutation> hintOnFailure, + Dispatcher.RequestTime requestTime) { - super(replicaPlan, callback, writeType, hintOnFailure, requestTime); - responses = blockFor(); + this(replicaPlan, null, writeType, hintOnFailure, requestTime); } - public WriteResponseHandler(ReplicaPlan.ForWrite replicaPlan, WriteType writeType, Supplier<Mutation> hintOnFailure, Dispatcher.RequestTime requestTime) + /** + * @param callback A callback to be called when the write is successful. + * @param hintOnFailure Enable/disable hinting on write failure + * @param requestTime Initial request time of the mutation to be used for timeouts and backpressure calculation + */ + public WriteResponseHandler(ForWrite replicaPlan, + Runnable callback, + WriteType writeType, + Supplier<Mutation> hintOnFailure, + Dispatcher.RequestTime requestTime) { - this(replicaPlan, null, writeType, hintOnFailure, requestTime); + this.replicaPlan = replicaPlan; + this.callback = callback; + this.writeType = writeType; + this.hintOnFailure = hintOnFailure; + this.tracker = new CallbackResponseTracker(replicaPlan.contacts().endpoints(), WriteResponseHandler.blockFor(replicaPlan)); + this.requestTime = requestTime; + this.idealCLReplicaCount = replicaPlan.contacts().size(); } - public void onResponse(Message<T> m) + /** + * Intended for use in DC-aware CL's, we'll track the DC responses in the response tracker. + */ + public void trackIdealCL(ReplicaPlan.ForWrite replicaPlan) { - replicaPlan.collectSuccess(m == null ? FBUtilities.getBroadcastAddressAndPort() : m.from()); - if (responsesUpdater.decrementAndGet(this) == 0) + tracker.enableIdealCLTracking(replicaPlan); + } + + public void get() throws WriteTimeoutException, WriteFailureException + { + long timeoutNanos = currentTimeoutNanos(); + + boolean signaled; + Map<InetAddressAndPort, RequestFailureReason> failuresByEndpoint; + try + { + signaled = condition.await(timeoutNanos, NANOSECONDS); + failuresByEndpoint = tracker.endProcessing(); + } + catch (InterruptedException e) + { + throw new UncheckedInterruptedException(e); + } + + if (signaled && receivedSufficientResponses()) + { + replicaPlan.checkStillAppliesTo(ClusterMetadata.current()); + return; + } + + if (!signaled) + throwTimeout(failuresByEndpoint); + + if (!receivedSufficientResponses()) + { + // We only want to consider replicas that count towards our blockFor() / requiredResponses threshold here to flag as a timeout + if (RequestCallback.isTimeout(failuresByEndpoint.keySet().stream() + .filter(this::waitingFor) + .collect(Collectors.toMap(Function.identity(), failuresByEndpoint::get)))) + throwTimeout(failuresByEndpoint); + throw new WriteFailureException(writeType, replicaPlan.consistencyLevel(), tracker.responseCount(), blockFor(), failuresByEndpoint); + } + + replicaPlan.checkStillAppliesTo(ClusterMetadata.current()); + } + + private void throwTimeout(Map<InetAddressAndPort, RequestFailureReason> failures) + { + int blockedFor = blockFor(); + int acks = tracker.responseCount(); + // It's pretty unlikely, but we can race between exiting await above and here, so + // that we could now have enough acks. In that case, we "lie" on the acks count to + // avoid sending confusing info to the user (see CASSANDRA-6491). + if (acks >= blockedFor) + acks = blockedFor - 1; + throw WriteTimeoutException.withParticipants(writeType, replicaPlan.consistencyLevel(), acks, blockedFor, failures); + } + + public final long currentTimeoutNanos() + { + long now = nanoTime(); + long requestTimeout = writeType == COUNTER + ? getCounterWriteRpcTimeout(NANOSECONDS) + : getWriteRpcTimeout(NANOSECONDS); + return requestTime.computeTimeout(now, requestTimeout); + } + + /** + * @return the minimum number of endpoints that must respond. + */ + @VisibleForTesting + public int blockFor() + { + // During bootstrap, we have to include the pending endpoints or we may fail the consistency level + // guarantees (see #833) + return blockFor(replicaPlan); + } + + public static int blockFor(ForWrite replicaPlan) + { + return replicaPlan.writeQuorum(); + } + + /** + * TODO: this method is brittle for its purpose of deciding when we should fail a query; + * this needs to be aware of which nodes are live/down Review Comment: Carried through in the refactor. Probably should be a JIRA but I don't quite know the original context. Looks like it was Benedict on CASSANDRA-14404 but the method's been touched twice since then and then we have TCM in the mix, so not sure if it still applies. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org