jmckenzie-dev commented on code in PR #4030:
URL: https://github.com/apache/cassandra/pull/4030#discussion_r2025510382


##########
src/java/org/apache/cassandra/service/WriteResponseHandler.java:
##########
@@ -17,58 +17,375 @@
  */
 package org.apache.cassandra.service;
 
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.function.Function;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
 
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.locator.ReplicaPlan;
+import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.WriteType;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.exceptions.WriteFailureException;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.locator.EndpointsForToken;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.ReplicaPlan.ForWrite;
+import org.apache.cassandra.net.CallbackResponseTracker;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.RequestCallback;
+import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.transport.Dispatcher;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.Condition;
+import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 
-/**
- * Handles blocking writes for ONE, ANY, TWO, THREE, QUORUM, and ALL 
consistency levels.
- */
-public class WriteResponseHandler<T> extends AbstractWriteResponseHandler<T>
+import static java.lang.Long.MAX_VALUE;
+import static java.lang.Math.min;
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static 
org.apache.cassandra.config.DatabaseDescriptor.getCounterWriteRpcTimeout;
+import static 
org.apache.cassandra.config.DatabaseDescriptor.getWriteRpcTimeout;
+import static org.apache.cassandra.db.WriteType.COUNTER;
+import static org.apache.cassandra.locator.Replicas.countInOurDc;
+import static org.apache.cassandra.schema.Schema.instance;
+import static org.apache.cassandra.service.StorageProxy.WritePerformer;
+import static org.apache.cassandra.utils.Clock.Global.nanoTime;
+import static 
org.apache.cassandra.utils.concurrent.Condition.newOneTimeCondition;
+
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class WriteResponseHandler<T> implements RequestCallback<T>
 {
     protected static final Logger logger = 
LoggerFactory.getLogger(WriteResponseHandler.class);
 
-    protected volatile int responses;
-    private static final AtomicIntegerFieldUpdater<WriteResponseHandler> 
responsesUpdater
-            = AtomicIntegerFieldUpdater.newUpdater(WriteResponseHandler.class, 
"responses");
+    private final Condition condition = newOneTimeCondition();
+    protected final ReplicaPlan.ForWrite replicaPlan;
+
+    protected final Runnable callback;
+    protected final WriteType writeType;
 
-    public WriteResponseHandler(ReplicaPlan.ForWrite replicaPlan,
-                                Runnable callback,
-                                WriteType writeType,
-                                Supplier<Mutation> hintOnFailure,
-                                Dispatcher.RequestTime requestTime)
+    private static final AtomicIntegerFieldUpdater<WriteResponseHandler> 
failuresUpdater =
+        AtomicIntegerFieldUpdater.newUpdater(WriteResponseHandler.class, 
"failureCount");
+    private volatile int failureCount = 0;
+
+    private final AtomicBoolean recordedMetric = new AtomicBoolean(false);
+
+    protected final Dispatcher.RequestTime requestTime;
+    protected final CallbackResponseTracker tracker;
+    private @Nullable final Supplier<Mutation> hintOnFailure;
+
+    /**
+     * The count of all the replicas we'd need to hit across DC's to qualify 
as hitting our "ideal"
+     *
+     * Of note, this isn't a calculation of what would qualify as a quorum 
across all DC's, i.e. "EACH_QUORUM", but rather
+     * a brute force calculation of what would equate to CL_ALL. This is how 
we originally calculated things on implementation
+     * in CASSANDRA-13289 so we continue with that. It would be relatively 
trivial to extend this logic to include
+     * a per-DC calculation similar to how we track different DC responses in 
{@link EachQuorumResponseHandler#pendingResponsesPerDC}
+     */
+    @SuppressWarnings("JavadocReference")
+    protected final int idealCLReplicaCount;
+
+    public WriteResponseHandler(ForWrite replicaPlan,
+                                   WriteType writeType,
+                                   Supplier<Mutation> hintOnFailure,
+                                   Dispatcher.RequestTime requestTime)
     {
-        super(replicaPlan, callback, writeType, hintOnFailure, requestTime);
-        responses = blockFor();
+        this(replicaPlan, null, writeType, hintOnFailure, requestTime);
     }
 
-    public WriteResponseHandler(ReplicaPlan.ForWrite replicaPlan, WriteType 
writeType, Supplier<Mutation> hintOnFailure, Dispatcher.RequestTime requestTime)
+    /**
+     * @param callback           A callback to be called when the write is 
successful.
+     * @param hintOnFailure      Enable/disable hinting on write failure
+     * @param requestTime        Initial request time of the mutation to be 
used for timeouts and backpressure calculation
+     */
+    public WriteResponseHandler(ForWrite replicaPlan,
+                                           Runnable callback,
+                                           WriteType writeType,
+                                           Supplier<Mutation> hintOnFailure,
+                                           Dispatcher.RequestTime requestTime)
     {
-        this(replicaPlan, null, writeType, hintOnFailure, requestTime);
+        this.replicaPlan = replicaPlan;
+        this.callback = callback;
+        this.writeType = writeType;
+        this.hintOnFailure = hintOnFailure;
+        this.tracker = new 
CallbackResponseTracker(replicaPlan.contacts().endpoints(), 
WriteResponseHandler.blockFor(replicaPlan));
+        this.requestTime = requestTime;
+        this.idealCLReplicaCount = replicaPlan.contacts().size();
     }
 
-    public void onResponse(Message<T> m)
+    /**
+     * Intended for use in DC-aware CL's, we'll track the DC responses in the 
response tracker.
+     */
+    public void trackIdealCL(ReplicaPlan.ForWrite replicaPlan)
     {
-        replicaPlan.collectSuccess(m == null ? 
FBUtilities.getBroadcastAddressAndPort() : m.from());
-        if (responsesUpdater.decrementAndGet(this) == 0)
+        tracker.enableIdealCLTracking(replicaPlan);
+    }
+
+    public void get() throws WriteTimeoutException, WriteFailureException
+    {
+        long timeoutNanos = currentTimeoutNanos();
+
+        boolean signaled;
+        Map<InetAddressAndPort, RequestFailureReason> failuresByEndpoint;
+        try
+        {
+            signaled = condition.await(timeoutNanos, NANOSECONDS);
+            failuresByEndpoint = tracker.endProcessing();
+        }
+        catch (InterruptedException e)
+        {
+            throw new UncheckedInterruptedException(e);
+        }
+
+        if (signaled && receivedSufficientResponses())
+        {
+            replicaPlan.checkStillAppliesTo(ClusterMetadata.current());
+            return;
+        }
+
+        if (!signaled)
+            throwTimeout(failuresByEndpoint);
+
+        if (!receivedSufficientResponses())
+        {
+            // We only want to consider replicas that count towards our 
blockFor() / requiredResponses threshold here to flag as a timeout
+            if (RequestCallback.isTimeout(failuresByEndpoint.keySet().stream()
+                                                           
.filter(this::waitingFor)
+                                                           
.collect(Collectors.toMap(Function.identity(), failuresByEndpoint::get))))
+                throwTimeout(failuresByEndpoint);
+            throw new WriteFailureException(writeType, 
replicaPlan.consistencyLevel(), tracker.responseCount(), blockFor(), 
failuresByEndpoint);
+        }
+
+        replicaPlan.checkStillAppliesTo(ClusterMetadata.current());
+    }
+
+    private void throwTimeout(Map<InetAddressAndPort, RequestFailureReason> 
failures)
+    {
+        int blockedFor = blockFor();
+        int acks = tracker.responseCount();
+        // It's pretty unlikely, but we can race between exiting await above 
and here, so
+        // that we could now have enough acks. In that case, we "lie" on the 
acks count to
+        // avoid sending confusing info to the user (see CASSANDRA-6491).
+        if (acks >= blockedFor)
+            acks = blockedFor - 1;
+        throw WriteTimeoutException.withParticipants(writeType, 
replicaPlan.consistencyLevel(), acks, blockedFor, failures);
+    }
+
+    public final long currentTimeoutNanos()
+    {
+        long now = nanoTime();
+        long requestTimeout = writeType == COUNTER
+                              ? getCounterWriteRpcTimeout(NANOSECONDS)
+                              : getWriteRpcTimeout(NANOSECONDS);
+        return requestTime.computeTimeout(now, requestTimeout);
+    }
+
+    /**
+     * @return the minimum number of endpoints that must respond.
+     */
+    @VisibleForTesting
+    public int blockFor()
+    {
+        // During bootstrap, we have to include the pending endpoints or we 
may fail the consistency level
+        // guarantees (see #833)
+        return blockFor(replicaPlan);
+    }
+
+    public static int blockFor(ForWrite replicaPlan)
+    {
+        return replicaPlan.writeQuorum();
+    }
+
+    /**
+     * TODO: this method is brittle for its purpose of deciding when we should 
fail a query;
+     *       this needs to be aware of which nodes are live/down

Review Comment:
   Carried through in the refactor. Probably should be a JIRA but I don't quite 
know the original context. Looks like it was Benedict on CASSANDRA-14404 but 
the method's been touched twice since then and then we have TCM in the mix, so 
not sure if it still applies.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org

Reply via email to