jmckenzie-dev commented on code in PR #4030:
URL: https://github.com/apache/cassandra/pull/4030#discussion_r2025507934


##########
src/java/org/apache/cassandra/service/WriteResponseHandler.java:
##########
@@ -17,58 +17,375 @@
  */
 package org.apache.cassandra.service;
 
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.function.Function;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
 
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.locator.ReplicaPlan;
+import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.WriteType;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.exceptions.WriteFailureException;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.locator.EndpointsForToken;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.ReplicaPlan.ForWrite;
+import org.apache.cassandra.net.CallbackResponseTracker;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.RequestCallback;
+import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.transport.Dispatcher;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.Condition;
+import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 
-/**
- * Handles blocking writes for ONE, ANY, TWO, THREE, QUORUM, and ALL 
consistency levels.
- */
-public class WriteResponseHandler<T> extends AbstractWriteResponseHandler<T>
+import static java.lang.Long.MAX_VALUE;
+import static java.lang.Math.min;
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static 
org.apache.cassandra.config.DatabaseDescriptor.getCounterWriteRpcTimeout;
+import static 
org.apache.cassandra.config.DatabaseDescriptor.getWriteRpcTimeout;
+import static org.apache.cassandra.db.WriteType.COUNTER;
+import static org.apache.cassandra.locator.Replicas.countInOurDc;
+import static org.apache.cassandra.schema.Schema.instance;
+import static org.apache.cassandra.service.StorageProxy.WritePerformer;
+import static org.apache.cassandra.utils.Clock.Global.nanoTime;
+import static 
org.apache.cassandra.utils.concurrent.Condition.newOneTimeCondition;
+
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class WriteResponseHandler<T> implements RequestCallback<T>
 {
     protected static final Logger logger = 
LoggerFactory.getLogger(WriteResponseHandler.class);
 
-    protected volatile int responses;
-    private static final AtomicIntegerFieldUpdater<WriteResponseHandler> 
responsesUpdater
-            = AtomicIntegerFieldUpdater.newUpdater(WriteResponseHandler.class, 
"responses");
+    private final Condition condition = newOneTimeCondition();
+    protected final ReplicaPlan.ForWrite replicaPlan;
+
+    protected final Runnable callback;
+    protected final WriteType writeType;
 
-    public WriteResponseHandler(ReplicaPlan.ForWrite replicaPlan,
-                                Runnable callback,
-                                WriteType writeType,
-                                Supplier<Mutation> hintOnFailure,
-                                Dispatcher.RequestTime requestTime)
+    private static final AtomicIntegerFieldUpdater<WriteResponseHandler> 
failuresUpdater =
+        AtomicIntegerFieldUpdater.newUpdater(WriteResponseHandler.class, 
"failureCount");
+    private volatile int failureCount = 0;
+
+    private final AtomicBoolean recordedMetric = new AtomicBoolean(false);
+
+    protected final Dispatcher.RequestTime requestTime;
+    protected final CallbackResponseTracker tracker;
+    private @Nullable final Supplier<Mutation> hintOnFailure;
+
+    /**
+     * The count of all the replicas we'd need to hit across DC's to qualify 
as hitting our "ideal"
+     *
+     * Of note, this isn't a calculation of what would qualify as a quorum 
across all DC's, i.e. "EACH_QUORUM", but rather
+     * a brute force calculation of what would equate to CL_ALL. This is how 
we originally calculated things on implementation
+     * in CASSANDRA-13289 so we continue with that. It would be relatively 
trivial to extend this logic to include
+     * a per-DC calculation similar to how we track different DC responses in 
{@link EachQuorumResponseHandler#pendingResponsesPerDC}
+     */
+    @SuppressWarnings("JavadocReference")
+    protected final int idealCLReplicaCount;
+
+    public WriteResponseHandler(ForWrite replicaPlan,
+                                   WriteType writeType,
+                                   Supplier<Mutation> hintOnFailure,
+                                   Dispatcher.RequestTime requestTime)
     {
-        super(replicaPlan, callback, writeType, hintOnFailure, requestTime);
-        responses = blockFor();
+        this(replicaPlan, null, writeType, hintOnFailure, requestTime);
     }
 
-    public WriteResponseHandler(ReplicaPlan.ForWrite replicaPlan, WriteType 
writeType, Supplier<Mutation> hintOnFailure, Dispatcher.RequestTime requestTime)
+    /**
+     * @param callback           A callback to be called when the write is 
successful.
+     * @param hintOnFailure      Enable/disable hinting on write failure
+     * @param requestTime        Initial request time of the mutation to be 
used for timeouts and backpressure calculation
+     */
+    public WriteResponseHandler(ForWrite replicaPlan,
+                                           Runnable callback,
+                                           WriteType writeType,
+                                           Supplier<Mutation> hintOnFailure,
+                                           Dispatcher.RequestTime requestTime)

Review Comment:
   Ok. This one had me curious.
   
   I inherited the partial javadoc from `AbstractWriteResponseHandler` which 
I've done away with; will definitely update that.
   
   Formatting is just... meh. Oops. /sad



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org

Reply via email to