jonmeredith commented on code in PR #4030:
URL: https://github.com/apache/cassandra/pull/4030#discussion_r2023656403


##########
src/java/org/apache/cassandra/locator/ReplicaPlan.java:
##########
@@ -355,9 +357,20 @@ private ForWrite copy(ConsistencyLevel 
newConsistencyLevel, EndpointsForToken ne
             return res;
         }
 
-        ForWrite withConsistencyLevel(ConsistencyLevel newConsistencylevel) { 
return copy(newConsistencylevel, contacts()); }
+        @VisibleForTesting
+        public ForWrite withConsistencyLevel(ConsistencyLevel 
newConsistencylevel) { return copy(newConsistencylevel, contacts()); }
         public ForWrite withContacts(EndpointsForToken newContact) { return 
copy(consistencyLevel, newContact); }
 
+        /**
+         * This raises an {@link IllegalArgumentException} if the state has 
changed; syntactic sugar around
+         * {@link #stillAppliesTo(ClusterMetadata)} making the calling 
convention a bit more ergonomic for the base case
+         * of "return true or raise an exception"
+         */
+        public void checkStillAppliesTo(ClusterMetadata newMetadata)
+        {
+            stillAppliesTo(newMetadata);
+        }

Review Comment:
   Why not rename the `stillAppliesTo` method?



##########
src/java/org/apache/cassandra/service/WriteResponseHandler.java:
##########
@@ -17,58 +17,375 @@
  */
 package org.apache.cassandra.service;
 
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.function.Function;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
 
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.locator.ReplicaPlan;
+import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.WriteType;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.exceptions.WriteFailureException;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.locator.EndpointsForToken;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.ReplicaPlan.ForWrite;
+import org.apache.cassandra.net.CallbackResponseTracker;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.RequestCallback;
+import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.transport.Dispatcher;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.Condition;
+import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 
-/**
- * Handles blocking writes for ONE, ANY, TWO, THREE, QUORUM, and ALL 
consistency levels.
- */
-public class WriteResponseHandler<T> extends AbstractWriteResponseHandler<T>
+import static java.lang.Long.MAX_VALUE;
+import static java.lang.Math.min;
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static 
org.apache.cassandra.config.DatabaseDescriptor.getCounterWriteRpcTimeout;
+import static 
org.apache.cassandra.config.DatabaseDescriptor.getWriteRpcTimeout;
+import static org.apache.cassandra.db.WriteType.COUNTER;
+import static org.apache.cassandra.locator.Replicas.countInOurDc;
+import static org.apache.cassandra.schema.Schema.instance;
+import static org.apache.cassandra.service.StorageProxy.WritePerformer;
+import static org.apache.cassandra.utils.Clock.Global.nanoTime;
+import static 
org.apache.cassandra.utils.concurrent.Condition.newOneTimeCondition;
+
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class WriteResponseHandler<T> implements RequestCallback<T>
 {
     protected static final Logger logger = 
LoggerFactory.getLogger(WriteResponseHandler.class);
 
-    protected volatile int responses;
-    private static final AtomicIntegerFieldUpdater<WriteResponseHandler> 
responsesUpdater
-            = AtomicIntegerFieldUpdater.newUpdater(WriteResponseHandler.class, 
"responses");
+    private final Condition condition = newOneTimeCondition();
+    protected final ReplicaPlan.ForWrite replicaPlan;
+
+    protected final Runnable callback;
+    protected final WriteType writeType;
 
-    public WriteResponseHandler(ReplicaPlan.ForWrite replicaPlan,
-                                Runnable callback,
-                                WriteType writeType,
-                                Supplier<Mutation> hintOnFailure,
-                                Dispatcher.RequestTime requestTime)
+    private static final AtomicIntegerFieldUpdater<WriteResponseHandler> 
failuresUpdater =
+        AtomicIntegerFieldUpdater.newUpdater(WriteResponseHandler.class, 
"failureCount");
+    private volatile int failureCount = 0;
+
+    private final AtomicBoolean recordedMetric = new AtomicBoolean(false);
+
+    protected final Dispatcher.RequestTime requestTime;
+    protected final CallbackResponseTracker tracker;
+    private @Nullable final Supplier<Mutation> hintOnFailure;
+
+    /**
+     * The count of all the replicas we'd need to hit across DC's to qualify 
as hitting our "ideal"
+     *
+     * Of note, this isn't a calculation of what would qualify as a quorum 
across all DC's, i.e. "EACH_QUORUM", but rather
+     * a brute force calculation of what would equate to CL_ALL. This is how 
we originally calculated things on implementation
+     * in CASSANDRA-13289 so we continue with that. It would be relatively 
trivial to extend this logic to include
+     * a per-DC calculation similar to how we track different DC responses in 
{@link EachQuorumResponseHandler#pendingResponsesPerDC}
+     */
+    @SuppressWarnings("JavadocReference")
+    protected final int idealCLReplicaCount;
+
+    public WriteResponseHandler(ForWrite replicaPlan,
+                                   WriteType writeType,
+                                   Supplier<Mutation> hintOnFailure,
+                                   Dispatcher.RequestTime requestTime)
     {
-        super(replicaPlan, callback, writeType, hintOnFailure, requestTime);
-        responses = blockFor();
+        this(replicaPlan, null, writeType, hintOnFailure, requestTime);
     }
 
-    public WriteResponseHandler(ReplicaPlan.ForWrite replicaPlan, WriteType 
writeType, Supplier<Mutation> hintOnFailure, Dispatcher.RequestTime requestTime)
+    /**
+     * @param callback           A callback to be called when the write is 
successful.
+     * @param hintOnFailure      Enable/disable hinting on write failure
+     * @param requestTime        Initial request time of the mutation to be 
used for timeouts and backpressure calculation
+     */
+    public WriteResponseHandler(ForWrite replicaPlan,
+                                           Runnable callback,
+                                           WriteType writeType,
+                                           Supplier<Mutation> hintOnFailure,
+                                           Dispatcher.RequestTime requestTime)

Review Comment:
   formatting and partial javadoc.



##########
src/java/org/apache/cassandra/service/StorageProxy.java:
##########
@@ -1307,14 +1312,16 @@ else if (!firstColumnFamilyStore.equals(store))
         }
     }
 
-    private static void syncWriteToBatchlog(Collection<Mutation> mutations, 
ReplicaPlan.ForWrite replicaPlan, TimeUUID uuid, Dispatcher.RequestTime 
requestTime)
-    throws WriteTimeoutException, WriteFailureException
+    private static void syncWriteToBatchlog(Collection<Mutation> mutations,
+                                            ReplicaPlan.ForWrite replicaPlan,
+                                            TimeUUID uuid,
+                                            Dispatcher.RequestTime requestTime)
+                                        throws WriteTimeoutException, 
WriteFailureException

Review Comment:
   formatting nit



##########
test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java:
##########
@@ -60,6 +63,8 @@
 
 public class WriteResponseHandlerTest
 {
+    protected static final Logger logger = 
LoggerFactory.getLogger(WriteResponseHandlerTest.class);
+

Review Comment:
   unused?



##########
test/unit/org/apache/cassandra/net/CallbackPropertyTest.java:
##########
@@ -0,0 +1,750 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.ServerTestUtils;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.EmptyIterators;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.db.ReadResponseTest;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.TruncateResponse;
+import org.apache.cassandra.db.WriteType;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.exceptions.TruncateException;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.locator.EndpointsForToken;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.ReplicaPlans;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.EachQuorumResponseHandler;
+import org.apache.cassandra.service.LocalQuorumResponseHandler;
+import org.apache.cassandra.service.TruncateResponseHandler;
+import org.apache.cassandra.service.WriteResponseHandler;
+import org.apache.cassandra.service.paxos.Ballot;
+import org.apache.cassandra.service.paxos.Commit;
+import org.apache.cassandra.service.paxos.PrepareResponse;
+import org.apache.cassandra.service.paxos.v1.PrepareCallback;
+import org.apache.cassandra.service.paxos.v1.ProposeCallback;
+import org.apache.cassandra.service.reads.DataResolver;
+import org.apache.cassandra.service.reads.ReadCallback;
+import org.apache.cassandra.service.reads.repair.ReadRepair;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.transport.Dispatcher;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.RandomHelpers;
+import org.quicktheories.generators.SourceDSL;
+
+import static org.apache.cassandra.Util.token;
+import static org.apache.cassandra.locator.ReplicaUtils.EP1;
+import static org.apache.cassandra.locator.ReplicaUtils.EP2;
+import static org.apache.cassandra.locator.ReplicaUtils.EP3;
+import static org.apache.cassandra.locator.ReplicaUtils.EP4;
+import static org.apache.cassandra.locator.ReplicaUtils.EP5;
+import static org.apache.cassandra.locator.ReplicaUtils.EP6;
+import static org.apache.cassandra.locator.ReplicaUtils.EP7;
+import static org.apache.cassandra.locator.ReplicaUtils.EP8;
+import static org.apache.cassandra.locator.ReplicaUtils.EP9;
+import static org.apache.cassandra.locator.ReplicaUtils.EP10;
+import static org.apache.cassandra.locator.ReplicaUtils.UNIQUE_EP;
+import static org.apache.cassandra.locator.ReplicaUtils.full;
+import static org.quicktheories.QuickTheory.qt;
+
+@SuppressWarnings({ "unchecked", "rawtypes", "ZeroLengthArrayAllocation" })
+public class CallbackPropertyTest
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(CallbackPropertyTest.class);
+
+    private enum CallbackType
+    {
+        NORMAL,
+        PAXOS,
+        TRUNCATE,
+        EACH_QUORUM,
+        LOCAL_QUORUM
+    }
+
+    /** Enable to print out details of the inferred test configuration setup 
**/
+    private static boolean DEBUG = false;
+
+    public static final String CF_STANDARD = "Standard1";
+
+    public static Keyspace ks;
+    public static ColumnFamilyStore cfs;
+    public static TableMetadata cfm;
+
+    private static final String DC1 = "datacenter1";
+    private static final String DC2 = "datacenter2";
+
+    /** By default, we'll test the CL's listed here unless the runner 
otherwise constrains things */
+    private EnumSet<ConsistencyLevel> testedCL = 
EnumSet.of(ConsistencyLevel.ALL, ConsistencyLevel.QUORUM, ConsistencyLevel.ONE);
+
+    public static DecoratedKey dk;
+
+    private SinglePartitionReadCommand command;
+
+    private ReplicaPlan.Shared readReplicaPlan;
+    private ReplicaPlan.ForWrite writeReplicaPlan;
+    private ReplicaPlan.ForPaxosWrite paxosReplicaPlan;
+    private ReplicaPlan.ForWrite eachQuorumPlan;
+    private ReplicaPlan.ForWrite localQuorumPlan;
+
+    private DataResolver resolver;
+    private Dispatcher.RequestTime requestTime;
+
+    private int dc1NodesDown;
+    private int dc2NodesDown = 0;
+    private int rf;
+    private ConsistencyLevel cl;
+    private int quorum;
+
+    /** We calculate whether we expect a test to succeed in the runner to then 
compare against from inside the test case's context */
+    private boolean caseExpectsException;
+
+    private Runnable testRunnable;
+
+    @BeforeClass
+    public static void setupClass()
+    {
+        DatabaseDescriptor.daemonInitialization();
+        DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+
+        // No need to wait so long
+        DatabaseDescriptor.setReadRpcTimeout(100);
+        DatabaseDescriptor.setWriteRpcTimeout(100);
+
+        ServerTestUtils.prepareServerNoRegister();
+
+        ClusterMetadataTestHelper.register(EP1, DC1, "R1");
+        ClusterMetadataTestHelper.register(EP2, DC1, "R1");
+        ClusterMetadataTestHelper.register(EP3, DC1, "R1");
+        ClusterMetadataTestHelper.register(EP4, DC1, "R1");
+        ClusterMetadataTestHelper.register(EP5, DC1, "R1");
+        ClusterMetadataTestHelper.register(EP6, DC2, "R2");
+        ClusterMetadataTestHelper.register(EP7, DC2, "R2");
+        ClusterMetadataTestHelper.register(EP8, DC2, "R2");
+        ClusterMetadataTestHelper.register(EP9, DC2, "R2");
+        ClusterMetadataTestHelper.register(EP10, DC2, "R2");
+        // We need to add endpoint's based on 0-4 == datacenter1, 5-9 == 
datacenter2.
+        for (int i = 0; i < 5; i++)
+        {
+            InetAddressAndPort peer1 = UNIQUE_EP.get(i);
+            InetAddressAndPort peer2 = UNIQUE_EP.get(i + 5);
+
+            Token t1 = token(i);
+            Token t2 = token(i + 5);
+
+            ClusterMetadataTestHelper.join(peer1, t1);
+            ClusterMetadataTestHelper.join(peer2, t2);
+        }
+    }
+
+    private void runTestMatrix(CallbackType type, Runnable r)
+    {
+        int minRF = 1;
+        testRunnable = r;
+
+        // Based on the type of callback we're testing we'll constrain various 
properties
+        switch (type)
+        {
+            case PAXOS:
+                minRF = 2;
+                testedCL = EnumSet.of(ConsistencyLevel.SERIAL);

Review Comment:
   Won't overwriting the class level testedCLs mean the tested CLs are 
dependent on the order `runTestMatrix` is called if it isn't reset for each 
`type`?



##########
test/unit/org/apache/cassandra/utils/RandomHelpers.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.UUID;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.commons.math3.random.MersenneTwister;
+import org.apache.commons.math3.random.RandomGenerator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.quicktheories.core.DetatchedRandomnessSource;
+import org.quicktheories.core.RandomnessSource;
+import org.quicktheories.impl.Constraint;
+
+import static org.apache.cassandra.config.CassandraRelevantEnv.USER_SEED;
+
+/**
+ * Provides a more uniform generator w/a longer period, replacability, and 
some convenience methods around seeds for reproducibility.
+ */
+@SuppressWarnings("JavadocReference")
+public final class RandomHelpers
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(RandomHelpers.class);
+    private static long seed = Clock.Global.currentTimeMillis();
+    private static RandomGenerator rg = new MersenneTwister(seed);
+    private static final SimpleRandomSource srs = new SimpleRandomSource(seed);
+
+    @SuppressWarnings("unused")
+    public static void replaceRandomGenerator(RandomGenerator newGen)
+    {
+        rg = newGen;
+    }
+
+    /**
+     * Will set the seed to the user defined value in env var {@link 
#USER_SEED} if set, otherwise current time in millis
+     */
+    public static void setSeed()
+    {
+        long newSeed = USER_SEED.getString() != null
+               ? Long.parseLong(USER_SEED.getString())
+               : Clock.Global.currentTimeMillis();
+        setSeed(newSeed);
+    }
+
+    public static void setSeed(long newSeed)
+    {
+        rg.setSeed(newSeed);
+        srs.setSeed(newSeed);
+    }
+
+    public static void printSeed(String context)
+    {
+        logger.info("Random seed for context: " + context + ": " + seed);

Review Comment:
   will be incorrect if `setSeed` has been called, but perhaps this is for 
printing on problems?



##########
test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java:
##########
@@ -28,6 +28,9 @@
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+

Review Comment:
   unused?



##########
test/unit/org/apache/cassandra/utils/RandomHelpers.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.UUID;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.commons.math3.random.MersenneTwister;
+import org.apache.commons.math3.random.RandomGenerator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.quicktheories.core.DetatchedRandomnessSource;
+import org.quicktheories.core.RandomnessSource;
+import org.quicktheories.impl.Constraint;
+
+import static org.apache.cassandra.config.CassandraRelevantEnv.USER_SEED;
+
+/**
+ * Provides a more uniform generator w/a longer period, replacability, and 
some convenience methods around seeds for reproducibility.
+ */
+@SuppressWarnings("JavadocReference")
+public final class RandomHelpers
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(RandomHelpers.class);
+    private static long seed = Clock.Global.currentTimeMillis();
+    private static RandomGenerator rg = new MersenneTwister(seed);
+    private static final SimpleRandomSource srs = new SimpleRandomSource(seed);
+
+    @SuppressWarnings("unused")
+    public static void replaceRandomGenerator(RandomGenerator newGen)
+    {
+        rg = newGen;
+    }
+
+    /**
+     * Will set the seed to the user defined value in env var {@link 
#USER_SEED} if set, otherwise current time in millis
+     */
+    public static void setSeed()
+    {
+        long newSeed = USER_SEED.getString() != null
+               ? Long.parseLong(USER_SEED.getString())
+               : Clock.Global.currentTimeMillis();
+        setSeed(newSeed);
+    }
+
+    public static void setSeed(long newSeed)
+    {
+        rg.setSeed(newSeed);
+        srs.setSeed(newSeed);
+    }
+
+    public static void printSeed(String context)
+    {
+        logger.info("Random seed for context: " + context + ": " + seed);
+    }
+
+    public static long getSeed()
+    {
+        return seed;
+    }
+
+    public static String makeRandomStringBounded(int maxLength)
+    {
+        return makeRandomString(nextInt(maxLength));
+    }
+
+    // Creates a random string
+    public static String makeRandomString(int length)
+    {
+        char[] chars = new char[length];
+        for (int i = 0; i < length; ++i)
+            chars[i++] = (char) ('a' + rg.nextInt('z' - 'a' + 1));
+        return new String(chars);
+    }
+
+    public static long nextLong()
+    {
+        return rg.nextLong();
+    }
+
+    public static int nextInt()
+    {
+        return rg.nextInt();
+    }
+
+    public static int nextInt(int bound)
+    {
+        return rg.nextInt(bound);
+    }
+
+    public static void nextBytes(byte[] buffer)
+    {
+        rg.nextBytes(buffer);
+    }
+
+    public static UUID nextUUID()
+    {
+        return Generators.UUID_RANDOM_GEN.generate(srs);
+    }
+
+    /**
+     * TODO: Add {@code Gen<BigInteger>} support into {@link 
AbstractTypeGenerators#PRIMITIVE_TYPE_DATA_GENS} and call that, similar to 
{@link #nextUUID}
+     * @param bounds Max number of bytes for the BigInteger
+     */
+    public static BigInteger nextBoundedBigInteger(int bounds)
+    {
+        return nextBigInteger(nextInt(bounds));
+    }
+
+    public static BigInteger nextBigInteger(int numBytes)
+    {
+        Preconditions.checkArgument(numBytes >= 0);
+        if (numBytes == 0)
+            return BigInteger.ZERO;
+
+        ByteBuffer bb = ByteBuffer.wrap(new byte[numBytes]);
+        for (int i = 0; i < numBytes / 4; i++)
+            bb.putInt(nextInt());
+        return new BigInteger(bb.array());
+    }
+
+    /**
+     * TODO: Add {@code Gen<BigDecimal>} support into {@link 
AbstractTypeGenerators#PRIMITIVE_TYPE_DATA_GENS} and call that, similar to 
{@link #nextUUID}

Review Comment:
   still todo?



##########
test/unit/org/apache/cassandra/utils/RandomHelpers.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.UUID;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.commons.math3.random.MersenneTwister;
+import org.apache.commons.math3.random.RandomGenerator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.quicktheories.core.DetatchedRandomnessSource;
+import org.quicktheories.core.RandomnessSource;
+import org.quicktheories.impl.Constraint;
+
+import static org.apache.cassandra.config.CassandraRelevantEnv.USER_SEED;
+
+/**
+ * Provides a more uniform generator w/a longer period, replacability, and 
some convenience methods around seeds for reproducibility.
+ */
+@SuppressWarnings("JavadocReference")
+public final class RandomHelpers
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(RandomHelpers.class);
+    private static long seed = Clock.Global.currentTimeMillis();
+    private static RandomGenerator rg = new MersenneTwister(seed);
+    private static final SimpleRandomSource srs = new SimpleRandomSource(seed);
+
+    @SuppressWarnings("unused")
+    public static void replaceRandomGenerator(RandomGenerator newGen)
+    {
+        rg = newGen;
+    }
+
+    /**
+     * Will set the seed to the user defined value in env var {@link 
#USER_SEED} if set, otherwise current time in millis
+     */
+    public static void setSeed()
+    {
+        long newSeed = USER_SEED.getString() != null
+               ? Long.parseLong(USER_SEED.getString())
+               : Clock.Global.currentTimeMillis();
+        setSeed(newSeed);
+    }
+
+    public static void setSeed(long newSeed)
+    {
+        rg.setSeed(newSeed);
+        srs.setSeed(newSeed);
+    }
+
+    public static void printSeed(String context)
+    {
+        logger.info("Random seed for context: " + context + ": " + seed);
+    }
+
+    public static long getSeed()
+    {
+        return seed;
+    }
+
+    public static String makeRandomStringBounded(int maxLength)
+    {
+        return makeRandomString(nextInt(maxLength));
+    }
+
+    // Creates a random string
+    public static String makeRandomString(int length)
+    {
+        char[] chars = new char[length];
+        for (int i = 0; i < length; ++i)
+            chars[i++] = (char) ('a' + rg.nextInt('z' - 'a' + 1));
+        return new String(chars);
+    }
+
+    public static long nextLong()
+    {
+        return rg.nextLong();
+    }
+
+    public static int nextInt()
+    {
+        return rg.nextInt();
+    }
+
+    public static int nextInt(int bound)
+    {
+        return rg.nextInt(bound);
+    }
+
+    public static void nextBytes(byte[] buffer)
+    {
+        rg.nextBytes(buffer);
+    }
+
+    public static UUID nextUUID()
+    {
+        return Generators.UUID_RANDOM_GEN.generate(srs);
+    }
+
+    /**
+     * TODO: Add {@code Gen<BigInteger>} support into {@link 
AbstractTypeGenerators#PRIMITIVE_TYPE_DATA_GENS} and call that, similar to 
{@link #nextUUID}
+     * @param bounds Max number of bytes for the BigInteger
+     */
+    public static BigInteger nextBoundedBigInteger(int bounds)
+    {
+        return nextBigInteger(nextInt(bounds));
+    }
+
+    public static BigInteger nextBigInteger(int numBytes)
+    {
+        Preconditions.checkArgument(numBytes >= 0);
+        if (numBytes == 0)
+            return BigInteger.ZERO;
+
+        ByteBuffer bb = ByteBuffer.wrap(new byte[numBytes]);
+        for (int i = 0; i < numBytes / 4; i++)
+            bb.putInt(nextInt());

Review Comment:
   `org.apache.cassandra.index.sai.SAITester.Randomization#nextBigInteger(int)` 
is interesting, passing the random source into `BigInteger` to generate. It 
doesn't look like `nextBoundedBigInteger` that calls this is used.



##########
src/java/org/apache/cassandra/service/paxos/v1/PrepareCallback.java:
##########
@@ -40,30 +41,43 @@ public class PrepareCallback extends 
AbstractPaxosCallback<PrepareResponse>
 {
     private static final Logger logger = 
LoggerFactory.getLogger(PrepareCallback.class);
 
-    public boolean promised = true;
-    public Commit mostRecentCommit;
-    public Commit mostRecentInProgressCommit;
+    private boolean promised = true;
+    private Commit mostRecentCommit;
+    private Commit mostRecentInProgressCommit;
 
     private final Map<InetAddressAndPort, Commit> commitsByReplica = new 
ConcurrentHashMap<>();
 
-    public PrepareCallback(DecoratedKey key, TableMetadata metadata, int 
targets, ConsistencyLevel consistency, Dispatcher.RequestTime requestTime)
+
+    public PrepareCallback(DecoratedKey key, TableMetadata metadata, 
ReplicaPlan.ForPaxosWrite replicaPlan, Dispatcher.RequestTime requestTime)
     {
-        super(targets, consistency, requestTime);
+        super(replicaPlan.contacts().endpoints(), 
replicaPlan.requiredParticipants(), replicaPlan.consistencyLevel(), 
requestTime);
         // need to inject the right key in the empty commit so comparing with 
empty commits in the response works as expected
         mostRecentCommit = Commit.emptyCommit(key, metadata);
         mostRecentInProgressCommit = Commit.emptyCommit(key, metadata);
     }
 
-    public synchronized void onResponse(Message<PrepareResponse> message)
+    public boolean isPromised() { return promised; }
+    public Commit getMostRecentCommit() { return mostRecentCommit; }
+    public Commit getMostRecentInProgressCommit() { return 
mostRecentInProgressCommit; }
+
+    public synchronized void onResponse(Message<PrepareResponse> msg)
     {
-        PrepareResponse response = message.payload;
-        logger.trace("Prepare response {} from {}", response, message.from());
+        Preconditions.checkNotNull(msg, "Got unexpected null message in 
onResponse callback.");
+        PrepareResponse response = msg.payload;
+        tracker.recordResponse(msg.from());
+        logger.trace("Prepare response {} from {}", response, msg.from());
+
+        // We don't want to flip the promise state if we've already succeeded 
or failed on this callback; if we've hit
+        // a quorum promised subsequent failures shouldn't be considered in 
our calculation.
+        if (latch.count() == 0)
+            return;
 
         // We set the mostRecentInProgressCommit even if we're not promised 
as, in that case, the ballot of that commit
         // will be used to avoid generating a ballot that has not chance to 
win on retry (think clock skew).
         if (response.inProgressCommit.isAfter(mostRecentInProgressCommit))
             mostRecentInProgressCommit = response.inProgressCommit;
 
+        // Immediately bail out if any participant declines
         if (!response.promised)
         {
             promised = false;

Review Comment:
   should this be DRYed up with `signal()`?



##########
src/java/org/apache/cassandra/service/WriteResponseHandler.java:
##########
@@ -17,58 +17,375 @@
  */
 package org.apache.cassandra.service;
 
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.function.Function;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
 
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.locator.ReplicaPlan;
+import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.WriteType;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.exceptions.WriteFailureException;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.locator.EndpointsForToken;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.ReplicaPlan.ForWrite;
+import org.apache.cassandra.net.CallbackResponseTracker;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.RequestCallback;
+import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.transport.Dispatcher;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.Condition;
+import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 
-/**
- * Handles blocking writes for ONE, ANY, TWO, THREE, QUORUM, and ALL 
consistency levels.
- */
-public class WriteResponseHandler<T> extends AbstractWriteResponseHandler<T>
+import static java.lang.Long.MAX_VALUE;
+import static java.lang.Math.min;
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static 
org.apache.cassandra.config.DatabaseDescriptor.getCounterWriteRpcTimeout;
+import static 
org.apache.cassandra.config.DatabaseDescriptor.getWriteRpcTimeout;
+import static org.apache.cassandra.db.WriteType.COUNTER;
+import static org.apache.cassandra.locator.Replicas.countInOurDc;
+import static org.apache.cassandra.schema.Schema.instance;
+import static org.apache.cassandra.service.StorageProxy.WritePerformer;
+import static org.apache.cassandra.utils.Clock.Global.nanoTime;
+import static 
org.apache.cassandra.utils.concurrent.Condition.newOneTimeCondition;
+
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class WriteResponseHandler<T> implements RequestCallback<T>
 {
     protected static final Logger logger = 
LoggerFactory.getLogger(WriteResponseHandler.class);
 
-    protected volatile int responses;
-    private static final AtomicIntegerFieldUpdater<WriteResponseHandler> 
responsesUpdater
-            = AtomicIntegerFieldUpdater.newUpdater(WriteResponseHandler.class, 
"responses");
+    private final Condition condition = newOneTimeCondition();
+    protected final ReplicaPlan.ForWrite replicaPlan;
+
+    protected final Runnable callback;
+    protected final WriteType writeType;
 
-    public WriteResponseHandler(ReplicaPlan.ForWrite replicaPlan,
-                                Runnable callback,
-                                WriteType writeType,
-                                Supplier<Mutation> hintOnFailure,
-                                Dispatcher.RequestTime requestTime)
+    private static final AtomicIntegerFieldUpdater<WriteResponseHandler> 
failuresUpdater =
+        AtomicIntegerFieldUpdater.newUpdater(WriteResponseHandler.class, 
"failureCount");
+    private volatile int failureCount = 0;
+
+    private final AtomicBoolean recordedMetric = new AtomicBoolean(false);
+
+    protected final Dispatcher.RequestTime requestTime;
+    protected final CallbackResponseTracker tracker;
+    private @Nullable final Supplier<Mutation> hintOnFailure;
+
+    /**
+     * The count of all the replicas we'd need to hit across DC's to qualify 
as hitting our "ideal"
+     *
+     * Of note, this isn't a calculation of what would qualify as a quorum 
across all DC's, i.e. "EACH_QUORUM", but rather
+     * a brute force calculation of what would equate to CL_ALL. This is how 
we originally calculated things on implementation
+     * in CASSANDRA-13289 so we continue with that. It would be relatively 
trivial to extend this logic to include
+     * a per-DC calculation similar to how we track different DC responses in 
{@link EachQuorumResponseHandler#pendingResponsesPerDC}
+     */
+    @SuppressWarnings("JavadocReference")
+    protected final int idealCLReplicaCount;
+
+    public WriteResponseHandler(ForWrite replicaPlan,
+                                   WriteType writeType,
+                                   Supplier<Mutation> hintOnFailure,
+                                   Dispatcher.RequestTime requestTime)
     {
-        super(replicaPlan, callback, writeType, hintOnFailure, requestTime);
-        responses = blockFor();
+        this(replicaPlan, null, writeType, hintOnFailure, requestTime);
     }
 
-    public WriteResponseHandler(ReplicaPlan.ForWrite replicaPlan, WriteType 
writeType, Supplier<Mutation> hintOnFailure, Dispatcher.RequestTime requestTime)
+    /**
+     * @param callback           A callback to be called when the write is 
successful.
+     * @param hintOnFailure      Enable/disable hinting on write failure
+     * @param requestTime        Initial request time of the mutation to be 
used for timeouts and backpressure calculation
+     */
+    public WriteResponseHandler(ForWrite replicaPlan,
+                                           Runnable callback,
+                                           WriteType writeType,
+                                           Supplier<Mutation> hintOnFailure,
+                                           Dispatcher.RequestTime requestTime)
     {
-        this(replicaPlan, null, writeType, hintOnFailure, requestTime);
+        this.replicaPlan = replicaPlan;
+        this.callback = callback;
+        this.writeType = writeType;
+        this.hintOnFailure = hintOnFailure;
+        this.tracker = new 
CallbackResponseTracker(replicaPlan.contacts().endpoints(), 
WriteResponseHandler.blockFor(replicaPlan));
+        this.requestTime = requestTime;
+        this.idealCLReplicaCount = replicaPlan.contacts().size();
     }
 
-    public void onResponse(Message<T> m)
+    /**
+     * Intended for use in DC-aware CL's, we'll track the DC responses in the 
response tracker.
+     */
+    public void trackIdealCL(ReplicaPlan.ForWrite replicaPlan)
     {
-        replicaPlan.collectSuccess(m == null ? 
FBUtilities.getBroadcastAddressAndPort() : m.from());
-        if (responsesUpdater.decrementAndGet(this) == 0)
+        tracker.enableIdealCLTracking(replicaPlan);
+    }
+
+    public void get() throws WriteTimeoutException, WriteFailureException
+    {
+        long timeoutNanos = currentTimeoutNanos();
+
+        boolean signaled;
+        Map<InetAddressAndPort, RequestFailureReason> failuresByEndpoint;
+        try
+        {
+            signaled = condition.await(timeoutNanos, NANOSECONDS);
+            failuresByEndpoint = tracker.endProcessing();
+        }
+        catch (InterruptedException e)
+        {
+            throw new UncheckedInterruptedException(e);
+        }
+
+        if (signaled && receivedSufficientResponses())
+        {
+            replicaPlan.checkStillAppliesTo(ClusterMetadata.current());
+            return;
+        }
+
+        if (!signaled)
+            throwTimeout(failuresByEndpoint);
+
+        if (!receivedSufficientResponses())
+        {
+            // We only want to consider replicas that count towards our 
blockFor() / requiredResponses threshold here to flag as a timeout
+            if (RequestCallback.isTimeout(failuresByEndpoint.keySet().stream()
+                                                           
.filter(this::waitingFor)
+                                                           
.collect(Collectors.toMap(Function.identity(), failuresByEndpoint::get))))
+                throwTimeout(failuresByEndpoint);
+            throw new WriteFailureException(writeType, 
replicaPlan.consistencyLevel(), tracker.responseCount(), blockFor(), 
failuresByEndpoint);
+        }
+
+        replicaPlan.checkStillAppliesTo(ClusterMetadata.current());
+    }
+
+    private void throwTimeout(Map<InetAddressAndPort, RequestFailureReason> 
failures)
+    {
+        int blockedFor = blockFor();
+        int acks = tracker.responseCount();
+        // It's pretty unlikely, but we can race between exiting await above 
and here, so
+        // that we could now have enough acks. In that case, we "lie" on the 
acks count to
+        // avoid sending confusing info to the user (see CASSANDRA-6491).
+        if (acks >= blockedFor)
+            acks = blockedFor - 1;
+        throw WriteTimeoutException.withParticipants(writeType, 
replicaPlan.consistencyLevel(), acks, blockedFor, failures);
+    }
+
+    public final long currentTimeoutNanos()
+    {
+        long now = nanoTime();
+        long requestTimeout = writeType == COUNTER
+                              ? getCounterWriteRpcTimeout(NANOSECONDS)
+                              : getWriteRpcTimeout(NANOSECONDS);
+        return requestTime.computeTimeout(now, requestTimeout);
+    }
+
+    /**
+     * @return the minimum number of endpoints that must respond.
+     */
+    @VisibleForTesting
+    public int blockFor()
+    {
+        // During bootstrap, we have to include the pending endpoints or we 
may fail the consistency level
+        // guarantees (see #833)
+        return blockFor(replicaPlan);
+    }
+
+    public static int blockFor(ForWrite replicaPlan)
+    {
+        return replicaPlan.writeQuorum();
+    }
+
+    /**
+     * TODO: this method is brittle for its purpose of deciding when we should 
fail a query;
+     *       this needs to be aware of which nodes are live/down

Review Comment:
   is this TODO carried through in the refactor or should there be a JIRA here 
that tracks the work?



##########
test/unit/org/apache/cassandra/utils/RandomHelpers.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.UUID;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.commons.math3.random.MersenneTwister;
+import org.apache.commons.math3.random.RandomGenerator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.quicktheories.core.DetatchedRandomnessSource;
+import org.quicktheories.core.RandomnessSource;
+import org.quicktheories.impl.Constraint;
+
+import static org.apache.cassandra.config.CassandraRelevantEnv.USER_SEED;
+
+/**
+ * Provides a more uniform generator w/a longer period, replacability, and 
some convenience methods around seeds for reproducibility.
+ */
+@SuppressWarnings("JavadocReference")
+public final class RandomHelpers
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(RandomHelpers.class);
+    private static long seed = Clock.Global.currentTimeMillis();
+    private static RandomGenerator rg = new MersenneTwister(seed);
+    private static final SimpleRandomSource srs = new SimpleRandomSource(seed);
+
+    @SuppressWarnings("unused")
+    public static void replaceRandomGenerator(RandomGenerator newGen)
+    {
+        rg = newGen;
+    }
+
+    /**
+     * Will set the seed to the user defined value in env var {@link 
#USER_SEED} if set, otherwise current time in millis
+     */
+    public static void setSeed()
+    {
+        long newSeed = USER_SEED.getString() != null
+               ? Long.parseLong(USER_SEED.getString())
+               : Clock.Global.currentTimeMillis();
+        setSeed(newSeed);
+    }
+
+    public static void setSeed(long newSeed)
+    {
+        rg.setSeed(newSeed);
+        srs.setSeed(newSeed);
+    }
+
+    public static void printSeed(String context)
+    {
+        logger.info("Random seed for context: " + context + ": " + seed);
+    }
+
+    public static long getSeed()
+    {
+        return seed;
+    }
+
+    public static String makeRandomStringBounded(int maxLength)
+    {
+        return makeRandomString(nextInt(maxLength));
+    }
+
+    // Creates a random string
+    public static String makeRandomString(int length)
+    {
+        char[] chars = new char[length];
+        for (int i = 0; i < length; ++i)
+            chars[i++] = (char) ('a' + rg.nextInt('z' - 'a' + 1));
+        return new String(chars);
+    }
+
+    public static long nextLong()
+    {
+        return rg.nextLong();
+    }
+
+    public static int nextInt()
+    {
+        return rg.nextInt();
+    }
+
+    public static int nextInt(int bound)
+    {
+        return rg.nextInt(bound);
+    }
+
+    public static void nextBytes(byte[] buffer)
+    {
+        rg.nextBytes(buffer);
+    }
+
+    public static UUID nextUUID()
+    {
+        return Generators.UUID_RANDOM_GEN.generate(srs);
+    }
+
+    /**
+     * TODO: Add {@code Gen<BigInteger>} support into {@link 
AbstractTypeGenerators#PRIMITIVE_TYPE_DATA_GENS} and call that, similar to 
{@link #nextUUID}

Review Comment:
   still todo?



##########
src/java/org/apache/cassandra/batchlog/BatchlogManager.java:
##########
@@ -548,18 +548,25 @@ private static class ReplayWriteResponseHandler<T> 
extends WriteResponseHandler<
             }
 
             @Override
-            protected int blockFor()
+            public int blockFor()
             {
                 return this.replicaPlan.contacts().size();
             }
 
             @Override
             public void onResponse(Message<T> m)
             {
+                // tracker update handled in super
                 boolean removed = undelivered.remove(m == null ? 
FBUtilities.getBroadcastAddressAndPort() : m.from());
                 assert removed;
                 super.onResponse(m);
             }
+
+            @Override
+            public boolean receivedSufficientResponses()
+            {
+                return super.receivedSufficientResponses();
+            }

Review Comment:
   Why override? For debugging?



##########
test/unit/org/apache/cassandra/net/CallbackPropertyTest.java:
##########
@@ -0,0 +1,750 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.ServerTestUtils;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.EmptyIterators;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.db.ReadResponseTest;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.TruncateResponse;
+import org.apache.cassandra.db.WriteType;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.exceptions.TruncateException;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.locator.EndpointsForToken;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.ReplicaPlans;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.EachQuorumResponseHandler;
+import org.apache.cassandra.service.LocalQuorumResponseHandler;
+import org.apache.cassandra.service.TruncateResponseHandler;
+import org.apache.cassandra.service.WriteResponseHandler;
+import org.apache.cassandra.service.paxos.Ballot;
+import org.apache.cassandra.service.paxos.Commit;
+import org.apache.cassandra.service.paxos.PrepareResponse;
+import org.apache.cassandra.service.paxos.v1.PrepareCallback;
+import org.apache.cassandra.service.paxos.v1.ProposeCallback;
+import org.apache.cassandra.service.reads.DataResolver;
+import org.apache.cassandra.service.reads.ReadCallback;
+import org.apache.cassandra.service.reads.repair.ReadRepair;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.transport.Dispatcher;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.RandomHelpers;
+import org.quicktheories.generators.SourceDSL;
+
+import static org.apache.cassandra.Util.token;
+import static org.apache.cassandra.locator.ReplicaUtils.EP1;
+import static org.apache.cassandra.locator.ReplicaUtils.EP2;
+import static org.apache.cassandra.locator.ReplicaUtils.EP3;
+import static org.apache.cassandra.locator.ReplicaUtils.EP4;
+import static org.apache.cassandra.locator.ReplicaUtils.EP5;
+import static org.apache.cassandra.locator.ReplicaUtils.EP6;
+import static org.apache.cassandra.locator.ReplicaUtils.EP7;
+import static org.apache.cassandra.locator.ReplicaUtils.EP8;
+import static org.apache.cassandra.locator.ReplicaUtils.EP9;
+import static org.apache.cassandra.locator.ReplicaUtils.EP10;
+import static org.apache.cassandra.locator.ReplicaUtils.UNIQUE_EP;
+import static org.apache.cassandra.locator.ReplicaUtils.full;
+import static org.quicktheories.QuickTheory.qt;
+
+@SuppressWarnings({ "unchecked", "rawtypes", "ZeroLengthArrayAllocation" })
+public class CallbackPropertyTest
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(CallbackPropertyTest.class);
+
+    private enum CallbackType
+    {
+        NORMAL,
+        PAXOS,
+        TRUNCATE,
+        EACH_QUORUM,
+        LOCAL_QUORUM
+    }
+
+    /** Enable to print out details of the inferred test configuration setup 
**/
+    private static boolean DEBUG = false;
+
+    public static final String CF_STANDARD = "Standard1";
+
+    public static Keyspace ks;
+    public static ColumnFamilyStore cfs;
+    public static TableMetadata cfm;
+
+    private static final String DC1 = "datacenter1";
+    private static final String DC2 = "datacenter2";
+
+    /** By default, we'll test the CL's listed here unless the runner 
otherwise constrains things */
+    private EnumSet<ConsistencyLevel> testedCL = 
EnumSet.of(ConsistencyLevel.ALL, ConsistencyLevel.QUORUM, ConsistencyLevel.ONE);
+
+    public static DecoratedKey dk;
+
+    private SinglePartitionReadCommand command;
+
+    private ReplicaPlan.Shared readReplicaPlan;
+    private ReplicaPlan.ForWrite writeReplicaPlan;
+    private ReplicaPlan.ForPaxosWrite paxosReplicaPlan;
+    private ReplicaPlan.ForWrite eachQuorumPlan;
+    private ReplicaPlan.ForWrite localQuorumPlan;
+
+    private DataResolver resolver;
+    private Dispatcher.RequestTime requestTime;
+
+    private int dc1NodesDown;
+    private int dc2NodesDown = 0;
+    private int rf;
+    private ConsistencyLevel cl;
+    private int quorum;
+
+    /** We calculate whether we expect a test to succeed in the runner to then 
compare against from inside the test case's context */
+    private boolean caseExpectsException;
+
+    private Runnable testRunnable;
+
+    @BeforeClass
+    public static void setupClass()
+    {
+        DatabaseDescriptor.daemonInitialization();
+        DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+
+        // No need to wait so long
+        DatabaseDescriptor.setReadRpcTimeout(100);
+        DatabaseDescriptor.setWriteRpcTimeout(100);
+
+        ServerTestUtils.prepareServerNoRegister();
+
+        ClusterMetadataTestHelper.register(EP1, DC1, "R1");
+        ClusterMetadataTestHelper.register(EP2, DC1, "R1");
+        ClusterMetadataTestHelper.register(EP3, DC1, "R1");
+        ClusterMetadataTestHelper.register(EP4, DC1, "R1");
+        ClusterMetadataTestHelper.register(EP5, DC1, "R1");
+        ClusterMetadataTestHelper.register(EP6, DC2, "R2");
+        ClusterMetadataTestHelper.register(EP7, DC2, "R2");
+        ClusterMetadataTestHelper.register(EP8, DC2, "R2");
+        ClusterMetadataTestHelper.register(EP9, DC2, "R2");
+        ClusterMetadataTestHelper.register(EP10, DC2, "R2");
+        // We need to add endpoint's based on 0-4 == datacenter1, 5-9 == 
datacenter2.
+        for (int i = 0; i < 5; i++)
+        {
+            InetAddressAndPort peer1 = UNIQUE_EP.get(i);
+            InetAddressAndPort peer2 = UNIQUE_EP.get(i + 5);
+
+            Token t1 = token(i);
+            Token t2 = token(i + 5);
+
+            ClusterMetadataTestHelper.join(peer1, t1);
+            ClusterMetadataTestHelper.join(peer2, t2);
+        }
+    }
+
+    private void runTestMatrix(CallbackType type, Runnable r)
+    {
+        int minRF = 1;
+        testRunnable = r;
+
+        // Based on the type of callback we're testing we'll constrain various 
properties
+        switch (type)
+        {
+            case PAXOS:
+                minRF = 2;
+                testedCL = EnumSet.of(ConsistencyLevel.SERIAL);
+                break;
+            case TRUNCATE:
+                testedCL = EnumSet.of(ConsistencyLevel.ALL);
+                cl = ConsistencyLevel.ALL;
+                break;
+            case EACH_QUORUM:
+                cl = ConsistencyLevel.EACH_QUORUM;
+                break;
+            case LOCAL_QUORUM:
+                cl = ConsistencyLevel.LOCAL_QUORUM;
+                break;
+            case NORMAL:
+            default:
+                // pass -> respect CL and test them all
+        }
+
+        if (type == CallbackType.EACH_QUORUM || type == 
CallbackType.LOCAL_QUORUM)
+        {
+            qt().forAll(
+                SourceDSL.integers().between(minRF, 5).describedAs(rf -> "rf: 
" + rf),
+                SourceDSL.integers().between(0, 5).describedAs(nd1 -> 
"DC1NodesDown: " + nd1),
+                SourceDSL.integers().between(0, 5).describedAs(nd2 -> 
"DC2NodesDown: " + nd2)
+                )
+                .checkAssert(this::runMultiDCTest);
+        }
+        else
+        {
+            qt().forAll(
+                SourceDSL.integers().between(minRF, 10).describedAs(rf -> "rf: 
" + rf),
+                SourceDSL.arbitrary().pick(testedCL.toArray(new 
ConsistencyLevel[0])).describedAs(cl -> "cl: " + cl),
+                SourceDSL.integers().between(0, 10).describedAs(nd -> 
"nodesDown: " + dc1NodesDown)
+                )
+                .checkAssert(this::runSingleDCTest);
+        }
+    }
+
+    private void runSingleDCTest(int rf, ConsistencyLevel cl, int nodesDown)
+    {
+        if (nodesDown > rf) return;  // Invalid scenario, skip
+
+        this.rf = rf;
+        this.cl = cl;
+        this.dc1NodesDown = nodesDown;
+
+        String ksName = "RequestCallbackTest" + rf + '_' + cl + '_' + 
nodesDown + System.nanoTime();
+        TableMetadata.Builder builder = TableMetadata.builder(ksName, 
CF_STANDARD)
+                                                     
.addPartitionKeyColumn("key", BytesType.instance)
+                                                     
.addClusteringColumn("col1", AsciiType.instance)
+                                                     .addRegularColumn("c1", 
AsciiType.instance)
+                                                     .addRegularColumn("c2", 
AsciiType.instance)
+                                                     .addRegularColumn("one", 
AsciiType.instance)
+                                                     .addRegularColumn("two", 
AsciiType.instance);
+
+        SchemaLoader.createKeyspace(ksName, KeyspaceParams.simple(rf), 
builder);
+
+        this.quorum = ReplicaPlans.calculateQuorumMajority(rf);
+        if (cl == ConsistencyLevel.ALL) this.quorum = rf;
+        if (cl == ConsistencyLevel.ONE) this.quorum = 1;
+
+        this.caseExpectsException = rf - nodesDown < quorum;
+
+        logger.info("RUNNING TEST CASE: rf: {}. cl: {}. nodesDown: {}. quorum: 
{}. expectedException: {}", rf, cl, nodesDown, quorum, caseExpectsException);
+
+        ks = Keyspace.open(ksName);
+        cfs = ks.getColumnFamilyStore(CF_STANDARD);
+        cfm = cfs.metadata();
+        dk = Util.dk("key1");
+        Token t = Murmur3Partitioner.instance.getToken(dk.getKey());
+        command = SinglePartitionReadCommand.fullPartitionRead(cfm, 
FBUtilities.nowInSeconds(), dk);
+
+        List<Replica> replicas = new ArrayList<>();
+        for (int i = 0; i < rf; i++)
+            replicas.add(full(UNIQUE_EP.get(i)));
+
+        EndpointsForToken targetReplicas = EndpointsForToken.of(dk.getToken(), 
replicas.toArray(new Replica[0]));
+        EndpointsForToken emptyReplicas = 
EndpointsForToken.empty(dk.getToken());
+
+        // Since all callbacks excepting multi-DC cl are handled by this test 
runner, we init replica plans we may not use here. /shrug
+        ReplicaPlan.ForTokenRead plan = ReplicaPlans.forRead(ks, 
dk.getToken(), null, cl, cfs.metadata().params.speculativeRetry);
+        readReplicaPlan = ReplicaPlan.shared(new ReplicaPlan.ForTokenRead(ks,
+                                                                          
ks.getReplicationStrategy(),
+                                                                          cl,
+                                                                          
targetReplicas,
+                                                                          
targetReplicas,
+                                                                          
(newClusterMetadata) -> plan,
+                                                                          
(self) -> { throw new IllegalStateException("Read repair is not supported for 
these tests"); },
+                                                                          
ClusterMetadata.current().epoch)
+        );
+
+        Function<ClusterMetadata, ReplicaPlan.ForWrite> writeRecompute = 
(newClusterMetadata) -> ReplicaPlans.forWrite(ks, cl, t, 
ReplicaPlans.writeNormal);
+        writeReplicaPlan = new ReplicaPlan.ForWrite(ks, 
ks.getReplicationStrategy(), cl, emptyReplicas, targetReplicas, emptyReplicas, 
targetReplicas, writeRecompute, ClusterMetadata.current().epoch);
+
+        int paxosRequired = 
ReplicaPlans.calculateQuorumMajority(targetReplicas.size());
+        Function<ClusterMetadata, ReplicaPlan.ForWrite> paxosRecompute = 
(newClusterMetadata) -> ReplicaPlans.forPaxos(ks, dk, cl);
+        paxosReplicaPlan = ReplicaPlan.ForPaxosWrite.forTest(ks, cl, 
emptyReplicas, targetReplicas, emptyReplicas, targetReplicas, paxosRequired, 
paxosRecompute, ClusterMetadata.current().epoch);
+
+        ReadRepair readRepair = ReadRepair.create(command, readReplicaPlan, 
requestTime);
+        resolver = new DataResolver(command, readReplicaPlan, readRepair, 
requestTime);
+
+        requestTime = new Dispatcher.RequestTime(Clock.Global.nanoTime());
+
+        testRunnable.run();
+    }
+
+    private void runMultiDCTest(int rf, int dc1NodesDown, int dc2NodesDown)
+    {
+        if (dc1NodesDown > rf) return;  // invalid param arrangement; skip
+        if (dc2NodesDown > rf) return;  // invalid param arrangement; skip
+
+        this.rf = rf;
+        this.quorum = ReplicaPlans.calculateQuorumMajority(rf);
+        this.dc1NodesDown = dc1NodesDown;
+        this.dc2NodesDown = dc2NodesDown;
+
+        // Only care about DC1 on local quorum check
+        caseExpectsException = cl == ConsistencyLevel.LOCAL_QUORUM
+                               ? rf - dc1NodesDown < quorum
+                               : rf - dc1NodesDown < quorum || rf - 
dc2NodesDown < quorum;
+
+        String ksName = "RequestCallbackTest" + rf + '_' + cl + '_' + 
dc1NodesDown + '_' + dc2NodesDown;
+        TableMetadata.Builder builder = TableMetadata.builder(ksName, 
CF_STANDARD)
+                                                     
.addPartitionKeyColumn("key", BytesType.instance)
+                                                     
.addClusteringColumn("col1", AsciiType.instance)
+                                                     .addRegularColumn("c1", 
AsciiType.instance)
+                                                     .addRegularColumn("c2", 
AsciiType.instance)
+                                                     .addRegularColumn("one", 
AsciiType.instance)
+                                                     .addRegularColumn("two", 
AsciiType.instance);
+
+        logger.info("RUNNING TEST CASE: rf: {}. cl: {}. dc1NodesDown: {}. 
dc2NodesDown: {}, quorum: {}. expectedException: {}", rf, cl, dc1NodesDown, 
dc2NodesDown, quorum, caseExpectsException);
+        KeyspaceParams params = KeyspaceParams.nts(DC1, rf, DC2, rf);
+        SchemaLoader.createKeyspace(ksName, params, builder);
+
+        ks = Keyspace.open(ksName);
+        cfs = ks.getColumnFamilyStore(CF_STANDARD);
+        cfm = cfs.metadata();
+        dk = Util.dk("key1");
+        Token t = Murmur3Partitioner.instance.getToken(dk.getKey());
+        command = SinglePartitionReadCommand.fullPartitionRead(cfm, 
FBUtilities.nowInSeconds(), dk);
+
+        List<Replica> replicas = new ArrayList<>();
+        for (int i = 0; i < rf; i++)
+            replicas.add(full(UNIQUE_EP.get(i)));
+
+        EndpointsForToken targetReplicas = EndpointsForToken.of(dk.getToken(), 
replicas.toArray(new Replica[0]));
+        EndpointsForToken emptyReplicas = 
EndpointsForToken.empty(dk.getToken());
+
+        Function<ClusterMetadata, ReplicaPlan.ForWrite> paxRecompute = 
(newClusterMetadata) -> ReplicaPlans.forWrite(ks, cl, t, 
ReplicaPlans.writeNormal);
+
+        eachQuorumPlan = new ReplicaPlan.ForWrite(ks, 
ks.getReplicationStrategy(), ConsistencyLevel.EACH_QUORUM, emptyReplicas, 
targetReplicas, emptyReplicas, targetReplicas, paxRecompute, 
ClusterMetadata.current().epoch);
+        debugLog("********************************** About to create LQP; 
check init below this");
+        localQuorumPlan = new ReplicaPlan.ForWrite(ks, 
ks.getReplicationStrategy(), ConsistencyLevel.LOCAL_QUORUM, emptyReplicas, 
targetReplicas, emptyReplicas, targetReplicas, paxRecompute, 
ClusterMetadata.current().epoch);
+
+        requestTime = new Dispatcher.RequestTime(Clock.Global.nanoTime());
+
+        testRunnable.run();
+    }
+
+    @Test
+    public void testRead()
+    {
+        runTestMatrix(CallbackType.NORMAL, () -> {
+            ReadCallback callback = new ReadCallback(resolver, command, 
readReplicaPlan, requestTime);
+            validateQuorum(callback.blockFor());
+            try
+            {
+                for (int i : shuffleEndpointOrder(rf))
+                {
+                    InetAddressAndPort ep = UNIQUE_EP.get(i);
+                    if (i >= dc1NodesDown)
+                    {
+                        ReadResponseTest.StubRepairedDataInfo rdi = new 
ReadResponseTest.StubRepairedDataInfo(ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
+                        ReadResponse response = 
command.createResponse(EmptyIterators.unfilteredPartition(cfm), rdi);
+                        callback.onResponse(Message.synthetic(ep, 
Verb.READ_RSP, response));
+                    }
+                    else
+                        callback.onFailure(ep, RequestFailureReason.TIMEOUT);
+                }
+                callback.awaitResults();
+            }
+            catch (Exception e)
+            {
+                validateExceptionState(ReadTimeoutException.class, e);
+                return;
+            }
+            validateNoException(ReadTimeoutException.class);
+        });
+    }
+
+    @Test
+    public void testWrite()
+    {
+        runTestMatrix(CallbackType.NORMAL, () -> {
+            WriteResponseHandler callback = new 
WriteResponseHandler(writeReplicaPlan, WriteType.SIMPLE, null, requestTime);
+            validateQuorum(callback.blockFor());
+            Assert.assertSame(String.format("Got unexpected blockFor / 
requiredResponses from cl: %s.", cl), quorum, callback.blockFor());
+            try
+            {
+                for (int i : shuffleEndpointOrder(rf))
+                {
+                    InetAddressAndPort ep = UNIQUE_EP.get(i);
+                    if (i >= dc1NodesDown)
+                        callback.onResponse(Message.synthetic(ep, 
Verb.MUTATION_RSP, null));
+                    else
+                        callback.onFailure(ep, RequestFailureReason.TIMEOUT);
+                }
+                callback.get();
+            }
+            catch (Exception e)
+            {
+                validateExceptionState(WriteTimeoutException.class, e);
+                return;
+            }
+            validateNoException(WriteTimeoutException.class);
+        });
+    }
+
+    /**
+     * Effectively, anything that's a non-zero # of downed nodes can be 
expected to raise an exception on callback completion.
+     * Timeouts get swallowed into and converted to TruncateException's
+     */
+    @Test
+    public void testTruncate()
+    {
+        runTestMatrix(CallbackType.TRUNCATE, () -> {
+            TruncateResponseHandler callback = new 
TruncateResponseHandler(writeReplicaPlan.contacts().endpoints());
+            // Should always have blockFor == entire RF for Truncate
+            Assert.assertEquals(String.format("Got unexpected blockFor / 
requiredResponses from cl %s", cl), rf, callback.blockFor());
+            try
+            {
+                for (int i : shuffleEndpointOrder(rf))
+                {
+                    InetAddressAndPort ep = UNIQUE_EP.get(i);
+                    if (i >= dc1NodesDown)
+                        callback.onResponse(Message.synthetic(ep, 
Verb.PAXOS_PROPOSE_RSP, new TruncateResponse(ks.getName(), CF_STANDARD, true)));
+                    else
+                        callback.onFailure(ep, RequestFailureReason.TIMEOUT);
+                }
+                callback.get();
+            }
+            catch (Exception e)
+            {
+                validateExceptionState(TruncateException.class, e);
+                return;
+            }
+            validateNoException(TruncateException.class);
+        });
+    }
+
+    @Test
+    public void testEachQuorum()
+    {
+        runTestMatrix(CallbackType.EACH_QUORUM, () -> {
+            debugLog("---------------- CYCLE. rf: {}. nd1: {}. nd2: {}. 
quorum: {}", rf, dc1NodesDown, dc2NodesDown, quorum);
+            EachQuorumResponseHandler callback = new 
EachQuorumResponseHandler(eachQuorumPlan, null, WriteType.SIMPLE, null, 
requestTime);

Review Comment:
   I ran the test with coverage on and noticed the ideal CL stuff wasn't being 
excercised, so I tried adding
   
   ```
               
callback.trackIdealCL(writeReplicaPlan.withConsistencyLevel(ConsistencyLevel.ALL));
   ```
   and found a counterexample
   
   ```
   java.lang.AssertionError: Property falsified after 7 example(s) 
   Smallest found falsifying value(s) :-
   {rf: 6, cl: ONE, nodesDown: 0}
   Cause was :-
   java.lang.AssertionError: Expected successful callback resolution but saw 
exception.
        at org.junit.Assert.fail(Assert.java:88)
        at 
org.apache.cassandra.net.CallbackPropertyTest.validateExceptionState(CallbackPropertyTest.java:773)
   ```
   
   I'm assuming setting an ideal CL should never fail the request.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org

Reply via email to