jmckenzie-dev commented on code in PR #4030: URL: https://github.com/apache/cassandra/pull/4030#discussion_r2025507934
########## src/java/org/apache/cassandra/service/WriteResponseHandler.java: ########## @@ -17,58 +17,375 @@ */ package org.apache.cassandra.service; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; +import javax.annotation.Nullable; -import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.locator.ReplicaPlan; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.net.Message; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.IMutation; +import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.WriteType; -import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.exceptions.WriteFailureException; +import org.apache.cassandra.exceptions.WriteTimeoutException; +import org.apache.cassandra.locator.EndpointsForToken; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.ReplicaPlan; +import org.apache.cassandra.locator.ReplicaPlan.ForWrite; +import org.apache.cassandra.net.CallbackResponseTracker; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.RequestCallback; +import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.transport.Dispatcher; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.concurrent.Condition; +import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; -/** - * Handles blocking writes for ONE, ANY, TWO, THREE, QUORUM, and ALL consistency levels. - */ -public class WriteResponseHandler<T> extends AbstractWriteResponseHandler<T> +import static java.lang.Long.MAX_VALUE; +import static java.lang.Math.min; +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static org.apache.cassandra.config.DatabaseDescriptor.getCounterWriteRpcTimeout; +import static org.apache.cassandra.config.DatabaseDescriptor.getWriteRpcTimeout; +import static org.apache.cassandra.db.WriteType.COUNTER; +import static org.apache.cassandra.locator.Replicas.countInOurDc; +import static org.apache.cassandra.schema.Schema.instance; +import static org.apache.cassandra.service.StorageProxy.WritePerformer; +import static org.apache.cassandra.utils.Clock.Global.nanoTime; +import static org.apache.cassandra.utils.concurrent.Condition.newOneTimeCondition; + +@SuppressWarnings({ "rawtypes", "unchecked" }) +public class WriteResponseHandler<T> implements RequestCallback<T> { protected static final Logger logger = LoggerFactory.getLogger(WriteResponseHandler.class); - protected volatile int responses; - private static final AtomicIntegerFieldUpdater<WriteResponseHandler> responsesUpdater - = AtomicIntegerFieldUpdater.newUpdater(WriteResponseHandler.class, "responses"); + private final Condition condition = newOneTimeCondition(); + protected final ReplicaPlan.ForWrite replicaPlan; + + protected final Runnable callback; + protected final WriteType writeType; - public WriteResponseHandler(ReplicaPlan.ForWrite replicaPlan, - Runnable callback, - WriteType writeType, - Supplier<Mutation> hintOnFailure, - Dispatcher.RequestTime requestTime) + private static final AtomicIntegerFieldUpdater<WriteResponseHandler> failuresUpdater = + AtomicIntegerFieldUpdater.newUpdater(WriteResponseHandler.class, "failureCount"); + private volatile int failureCount = 0; + + private final AtomicBoolean recordedMetric = new AtomicBoolean(false); + + protected final Dispatcher.RequestTime requestTime; + protected final CallbackResponseTracker tracker; + private @Nullable final Supplier<Mutation> hintOnFailure; + + /** + * The count of all the replicas we'd need to hit across DC's to qualify as hitting our "ideal" + * + * Of note, this isn't a calculation of what would qualify as a quorum across all DC's, i.e. "EACH_QUORUM", but rather + * a brute force calculation of what would equate to CL_ALL. This is how we originally calculated things on implementation + * in CASSANDRA-13289 so we continue with that. It would be relatively trivial to extend this logic to include + * a per-DC calculation similar to how we track different DC responses in {@link EachQuorumResponseHandler#pendingResponsesPerDC} + */ + @SuppressWarnings("JavadocReference") + protected final int idealCLReplicaCount; + + public WriteResponseHandler(ForWrite replicaPlan, + WriteType writeType, + Supplier<Mutation> hintOnFailure, + Dispatcher.RequestTime requestTime) { - super(replicaPlan, callback, writeType, hintOnFailure, requestTime); - responses = blockFor(); + this(replicaPlan, null, writeType, hintOnFailure, requestTime); } - public WriteResponseHandler(ReplicaPlan.ForWrite replicaPlan, WriteType writeType, Supplier<Mutation> hintOnFailure, Dispatcher.RequestTime requestTime) + /** + * @param callback A callback to be called when the write is successful. + * @param hintOnFailure Enable/disable hinting on write failure + * @param requestTime Initial request time of the mutation to be used for timeouts and backpressure calculation + */ + public WriteResponseHandler(ForWrite replicaPlan, + Runnable callback, + WriteType writeType, + Supplier<Mutation> hintOnFailure, + Dispatcher.RequestTime requestTime) Review Comment: Ok. This one had me curious. I inherited the partial javadoc from `AbstractWriteResponseHandler` which I've done away with; will definitely update that. Formatting is just... meh. Oops. /sad -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org