jonmeredith commented on code in PR #4030: URL: https://github.com/apache/cassandra/pull/4030#discussion_r2023656403
########## src/java/org/apache/cassandra/locator/ReplicaPlan.java: ########## @@ -355,9 +357,20 @@ private ForWrite copy(ConsistencyLevel newConsistencyLevel, EndpointsForToken ne return res; } - ForWrite withConsistencyLevel(ConsistencyLevel newConsistencylevel) { return copy(newConsistencylevel, contacts()); } + @VisibleForTesting + public ForWrite withConsistencyLevel(ConsistencyLevel newConsistencylevel) { return copy(newConsistencylevel, contacts()); } public ForWrite withContacts(EndpointsForToken newContact) { return copy(consistencyLevel, newContact); } + /** + * This raises an {@link IllegalArgumentException} if the state has changed; syntactic sugar around + * {@link #stillAppliesTo(ClusterMetadata)} making the calling convention a bit more ergonomic for the base case + * of "return true or raise an exception" + */ + public void checkStillAppliesTo(ClusterMetadata newMetadata) + { + stillAppliesTo(newMetadata); + } Review Comment: Why not rename the `stillAppliesTo` method? ########## src/java/org/apache/cassandra/service/WriteResponseHandler.java: ########## @@ -17,58 +17,375 @@ */ package org.apache.cassandra.service; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; +import javax.annotation.Nullable; -import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.locator.ReplicaPlan; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.net.Message; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.IMutation; +import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.WriteType; -import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.exceptions.WriteFailureException; +import org.apache.cassandra.exceptions.WriteTimeoutException; +import org.apache.cassandra.locator.EndpointsForToken; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.ReplicaPlan; +import org.apache.cassandra.locator.ReplicaPlan.ForWrite; +import org.apache.cassandra.net.CallbackResponseTracker; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.RequestCallback; +import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.transport.Dispatcher; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.concurrent.Condition; +import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; -/** - * Handles blocking writes for ONE, ANY, TWO, THREE, QUORUM, and ALL consistency levels. - */ -public class WriteResponseHandler<T> extends AbstractWriteResponseHandler<T> +import static java.lang.Long.MAX_VALUE; +import static java.lang.Math.min; +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static org.apache.cassandra.config.DatabaseDescriptor.getCounterWriteRpcTimeout; +import static org.apache.cassandra.config.DatabaseDescriptor.getWriteRpcTimeout; +import static org.apache.cassandra.db.WriteType.COUNTER; +import static org.apache.cassandra.locator.Replicas.countInOurDc; +import static org.apache.cassandra.schema.Schema.instance; +import static org.apache.cassandra.service.StorageProxy.WritePerformer; +import static org.apache.cassandra.utils.Clock.Global.nanoTime; +import static org.apache.cassandra.utils.concurrent.Condition.newOneTimeCondition; + +@SuppressWarnings({ "rawtypes", "unchecked" }) +public class WriteResponseHandler<T> implements RequestCallback<T> { protected static final Logger logger = LoggerFactory.getLogger(WriteResponseHandler.class); - protected volatile int responses; - private static final AtomicIntegerFieldUpdater<WriteResponseHandler> responsesUpdater - = AtomicIntegerFieldUpdater.newUpdater(WriteResponseHandler.class, "responses"); + private final Condition condition = newOneTimeCondition(); + protected final ReplicaPlan.ForWrite replicaPlan; + + protected final Runnable callback; + protected final WriteType writeType; - public WriteResponseHandler(ReplicaPlan.ForWrite replicaPlan, - Runnable callback, - WriteType writeType, - Supplier<Mutation> hintOnFailure, - Dispatcher.RequestTime requestTime) + private static final AtomicIntegerFieldUpdater<WriteResponseHandler> failuresUpdater = + AtomicIntegerFieldUpdater.newUpdater(WriteResponseHandler.class, "failureCount"); + private volatile int failureCount = 0; + + private final AtomicBoolean recordedMetric = new AtomicBoolean(false); + + protected final Dispatcher.RequestTime requestTime; + protected final CallbackResponseTracker tracker; + private @Nullable final Supplier<Mutation> hintOnFailure; + + /** + * The count of all the replicas we'd need to hit across DC's to qualify as hitting our "ideal" + * + * Of note, this isn't a calculation of what would qualify as a quorum across all DC's, i.e. "EACH_QUORUM", but rather + * a brute force calculation of what would equate to CL_ALL. This is how we originally calculated things on implementation + * in CASSANDRA-13289 so we continue with that. It would be relatively trivial to extend this logic to include + * a per-DC calculation similar to how we track different DC responses in {@link EachQuorumResponseHandler#pendingResponsesPerDC} + */ + @SuppressWarnings("JavadocReference") + protected final int idealCLReplicaCount; + + public WriteResponseHandler(ForWrite replicaPlan, + WriteType writeType, + Supplier<Mutation> hintOnFailure, + Dispatcher.RequestTime requestTime) { - super(replicaPlan, callback, writeType, hintOnFailure, requestTime); - responses = blockFor(); + this(replicaPlan, null, writeType, hintOnFailure, requestTime); } - public WriteResponseHandler(ReplicaPlan.ForWrite replicaPlan, WriteType writeType, Supplier<Mutation> hintOnFailure, Dispatcher.RequestTime requestTime) + /** + * @param callback A callback to be called when the write is successful. + * @param hintOnFailure Enable/disable hinting on write failure + * @param requestTime Initial request time of the mutation to be used for timeouts and backpressure calculation + */ + public WriteResponseHandler(ForWrite replicaPlan, + Runnable callback, + WriteType writeType, + Supplier<Mutation> hintOnFailure, + Dispatcher.RequestTime requestTime) Review Comment: formatting and partial javadoc. ########## src/java/org/apache/cassandra/service/StorageProxy.java: ########## @@ -1307,14 +1312,16 @@ else if (!firstColumnFamilyStore.equals(store)) } } - private static void syncWriteToBatchlog(Collection<Mutation> mutations, ReplicaPlan.ForWrite replicaPlan, TimeUUID uuid, Dispatcher.RequestTime requestTime) - throws WriteTimeoutException, WriteFailureException + private static void syncWriteToBatchlog(Collection<Mutation> mutations, + ReplicaPlan.ForWrite replicaPlan, + TimeUUID uuid, + Dispatcher.RequestTime requestTime) + throws WriteTimeoutException, WriteFailureException Review Comment: formatting nit ########## test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java: ########## @@ -60,6 +63,8 @@ public class WriteResponseHandlerTest { + protected static final Logger logger = LoggerFactory.getLogger(WriteResponseHandlerTest.class); + Review Comment: unused? ########## test/unit/org/apache/cassandra/net/CallbackPropertyTest.java: ########## @@ -0,0 +1,750 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.net; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.ServerTestUtils; +import org.apache.cassandra.Util; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.EmptyIterators; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.ReadResponse; +import org.apache.cassandra.db.ReadResponseTest; +import org.apache.cassandra.db.SinglePartitionReadCommand; +import org.apache.cassandra.db.TruncateResponse; +import org.apache.cassandra.db.WriteType; +import org.apache.cassandra.db.marshal.AsciiType; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper; +import org.apache.cassandra.exceptions.ReadTimeoutException; +import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.exceptions.TruncateException; +import org.apache.cassandra.exceptions.WriteTimeoutException; +import org.apache.cassandra.locator.EndpointsForToken; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.ReplicaPlan; +import org.apache.cassandra.locator.ReplicaPlans; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.EachQuorumResponseHandler; +import org.apache.cassandra.service.LocalQuorumResponseHandler; +import org.apache.cassandra.service.TruncateResponseHandler; +import org.apache.cassandra.service.WriteResponseHandler; +import org.apache.cassandra.service.paxos.Ballot; +import org.apache.cassandra.service.paxos.Commit; +import org.apache.cassandra.service.paxos.PrepareResponse; +import org.apache.cassandra.service.paxos.v1.PrepareCallback; +import org.apache.cassandra.service.paxos.v1.ProposeCallback; +import org.apache.cassandra.service.reads.DataResolver; +import org.apache.cassandra.service.reads.ReadCallback; +import org.apache.cassandra.service.reads.repair.ReadRepair; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.transport.Dispatcher; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.Clock; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.RandomHelpers; +import org.quicktheories.generators.SourceDSL; + +import static org.apache.cassandra.Util.token; +import static org.apache.cassandra.locator.ReplicaUtils.EP1; +import static org.apache.cassandra.locator.ReplicaUtils.EP2; +import static org.apache.cassandra.locator.ReplicaUtils.EP3; +import static org.apache.cassandra.locator.ReplicaUtils.EP4; +import static org.apache.cassandra.locator.ReplicaUtils.EP5; +import static org.apache.cassandra.locator.ReplicaUtils.EP6; +import static org.apache.cassandra.locator.ReplicaUtils.EP7; +import static org.apache.cassandra.locator.ReplicaUtils.EP8; +import static org.apache.cassandra.locator.ReplicaUtils.EP9; +import static org.apache.cassandra.locator.ReplicaUtils.EP10; +import static org.apache.cassandra.locator.ReplicaUtils.UNIQUE_EP; +import static org.apache.cassandra.locator.ReplicaUtils.full; +import static org.quicktheories.QuickTheory.qt; + +@SuppressWarnings({ "unchecked", "rawtypes", "ZeroLengthArrayAllocation" }) +public class CallbackPropertyTest +{ + private static final Logger logger = LoggerFactory.getLogger(CallbackPropertyTest.class); + + private enum CallbackType + { + NORMAL, + PAXOS, + TRUNCATE, + EACH_QUORUM, + LOCAL_QUORUM + } + + /** Enable to print out details of the inferred test configuration setup **/ + private static boolean DEBUG = false; + + public static final String CF_STANDARD = "Standard1"; + + public static Keyspace ks; + public static ColumnFamilyStore cfs; + public static TableMetadata cfm; + + private static final String DC1 = "datacenter1"; + private static final String DC2 = "datacenter2"; + + /** By default, we'll test the CL's listed here unless the runner otherwise constrains things */ + private EnumSet<ConsistencyLevel> testedCL = EnumSet.of(ConsistencyLevel.ALL, ConsistencyLevel.QUORUM, ConsistencyLevel.ONE); + + public static DecoratedKey dk; + + private SinglePartitionReadCommand command; + + private ReplicaPlan.Shared readReplicaPlan; + private ReplicaPlan.ForWrite writeReplicaPlan; + private ReplicaPlan.ForPaxosWrite paxosReplicaPlan; + private ReplicaPlan.ForWrite eachQuorumPlan; + private ReplicaPlan.ForWrite localQuorumPlan; + + private DataResolver resolver; + private Dispatcher.RequestTime requestTime; + + private int dc1NodesDown; + private int dc2NodesDown = 0; + private int rf; + private ConsistencyLevel cl; + private int quorum; + + /** We calculate whether we expect a test to succeed in the runner to then compare against from inside the test case's context */ + private boolean caseExpectsException; + + private Runnable testRunnable; + + @BeforeClass + public static void setupClass() + { + DatabaseDescriptor.daemonInitialization(); + DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance); + + // No need to wait so long + DatabaseDescriptor.setReadRpcTimeout(100); + DatabaseDescriptor.setWriteRpcTimeout(100); + + ServerTestUtils.prepareServerNoRegister(); + + ClusterMetadataTestHelper.register(EP1, DC1, "R1"); + ClusterMetadataTestHelper.register(EP2, DC1, "R1"); + ClusterMetadataTestHelper.register(EP3, DC1, "R1"); + ClusterMetadataTestHelper.register(EP4, DC1, "R1"); + ClusterMetadataTestHelper.register(EP5, DC1, "R1"); + ClusterMetadataTestHelper.register(EP6, DC2, "R2"); + ClusterMetadataTestHelper.register(EP7, DC2, "R2"); + ClusterMetadataTestHelper.register(EP8, DC2, "R2"); + ClusterMetadataTestHelper.register(EP9, DC2, "R2"); + ClusterMetadataTestHelper.register(EP10, DC2, "R2"); + // We need to add endpoint's based on 0-4 == datacenter1, 5-9 == datacenter2. + for (int i = 0; i < 5; i++) + { + InetAddressAndPort peer1 = UNIQUE_EP.get(i); + InetAddressAndPort peer2 = UNIQUE_EP.get(i + 5); + + Token t1 = token(i); + Token t2 = token(i + 5); + + ClusterMetadataTestHelper.join(peer1, t1); + ClusterMetadataTestHelper.join(peer2, t2); + } + } + + private void runTestMatrix(CallbackType type, Runnable r) + { + int minRF = 1; + testRunnable = r; + + // Based on the type of callback we're testing we'll constrain various properties + switch (type) + { + case PAXOS: + minRF = 2; + testedCL = EnumSet.of(ConsistencyLevel.SERIAL); Review Comment: Won't overwriting the class level testedCLs mean the tested CLs are dependent on the order `runTestMatrix` is called if it isn't reset for each `type`? ########## test/unit/org/apache/cassandra/utils/RandomHelpers.java: ########## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.util.Random; +import java.util.UUID; + +import com.google.common.base.Preconditions; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.commons.math3.random.MersenneTwister; +import org.apache.commons.math3.random.RandomGenerator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.quicktheories.core.DetatchedRandomnessSource; +import org.quicktheories.core.RandomnessSource; +import org.quicktheories.impl.Constraint; + +import static org.apache.cassandra.config.CassandraRelevantEnv.USER_SEED; + +/** + * Provides a more uniform generator w/a longer period, replacability, and some convenience methods around seeds for reproducibility. + */ +@SuppressWarnings("JavadocReference") +public final class RandomHelpers +{ + private static final Logger logger = LoggerFactory.getLogger(RandomHelpers.class); + private static long seed = Clock.Global.currentTimeMillis(); + private static RandomGenerator rg = new MersenneTwister(seed); + private static final SimpleRandomSource srs = new SimpleRandomSource(seed); + + @SuppressWarnings("unused") + public static void replaceRandomGenerator(RandomGenerator newGen) + { + rg = newGen; + } + + /** + * Will set the seed to the user defined value in env var {@link #USER_SEED} if set, otherwise current time in millis + */ + public static void setSeed() + { + long newSeed = USER_SEED.getString() != null + ? Long.parseLong(USER_SEED.getString()) + : Clock.Global.currentTimeMillis(); + setSeed(newSeed); + } + + public static void setSeed(long newSeed) + { + rg.setSeed(newSeed); + srs.setSeed(newSeed); + } + + public static void printSeed(String context) + { + logger.info("Random seed for context: " + context + ": " + seed); Review Comment: will be incorrect if `setSeed` has been called, but perhaps this is for printing on problems? ########## test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java: ########## @@ -28,6 +28,9 @@ import org.junit.BeforeClass; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + Review Comment: unused? ########## test/unit/org/apache/cassandra/utils/RandomHelpers.java: ########## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.util.Random; +import java.util.UUID; + +import com.google.common.base.Preconditions; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.commons.math3.random.MersenneTwister; +import org.apache.commons.math3.random.RandomGenerator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.quicktheories.core.DetatchedRandomnessSource; +import org.quicktheories.core.RandomnessSource; +import org.quicktheories.impl.Constraint; + +import static org.apache.cassandra.config.CassandraRelevantEnv.USER_SEED; + +/** + * Provides a more uniform generator w/a longer period, replacability, and some convenience methods around seeds for reproducibility. + */ +@SuppressWarnings("JavadocReference") +public final class RandomHelpers +{ + private static final Logger logger = LoggerFactory.getLogger(RandomHelpers.class); + private static long seed = Clock.Global.currentTimeMillis(); + private static RandomGenerator rg = new MersenneTwister(seed); + private static final SimpleRandomSource srs = new SimpleRandomSource(seed); + + @SuppressWarnings("unused") + public static void replaceRandomGenerator(RandomGenerator newGen) + { + rg = newGen; + } + + /** + * Will set the seed to the user defined value in env var {@link #USER_SEED} if set, otherwise current time in millis + */ + public static void setSeed() + { + long newSeed = USER_SEED.getString() != null + ? Long.parseLong(USER_SEED.getString()) + : Clock.Global.currentTimeMillis(); + setSeed(newSeed); + } + + public static void setSeed(long newSeed) + { + rg.setSeed(newSeed); + srs.setSeed(newSeed); + } + + public static void printSeed(String context) + { + logger.info("Random seed for context: " + context + ": " + seed); + } + + public static long getSeed() + { + return seed; + } + + public static String makeRandomStringBounded(int maxLength) + { + return makeRandomString(nextInt(maxLength)); + } + + // Creates a random string + public static String makeRandomString(int length) + { + char[] chars = new char[length]; + for (int i = 0; i < length; ++i) + chars[i++] = (char) ('a' + rg.nextInt('z' - 'a' + 1)); + return new String(chars); + } + + public static long nextLong() + { + return rg.nextLong(); + } + + public static int nextInt() + { + return rg.nextInt(); + } + + public static int nextInt(int bound) + { + return rg.nextInt(bound); + } + + public static void nextBytes(byte[] buffer) + { + rg.nextBytes(buffer); + } + + public static UUID nextUUID() + { + return Generators.UUID_RANDOM_GEN.generate(srs); + } + + /** + * TODO: Add {@code Gen<BigInteger>} support into {@link AbstractTypeGenerators#PRIMITIVE_TYPE_DATA_GENS} and call that, similar to {@link #nextUUID} + * @param bounds Max number of bytes for the BigInteger + */ + public static BigInteger nextBoundedBigInteger(int bounds) + { + return nextBigInteger(nextInt(bounds)); + } + + public static BigInteger nextBigInteger(int numBytes) + { + Preconditions.checkArgument(numBytes >= 0); + if (numBytes == 0) + return BigInteger.ZERO; + + ByteBuffer bb = ByteBuffer.wrap(new byte[numBytes]); + for (int i = 0; i < numBytes / 4; i++) + bb.putInt(nextInt()); + return new BigInteger(bb.array()); + } + + /** + * TODO: Add {@code Gen<BigDecimal>} support into {@link AbstractTypeGenerators#PRIMITIVE_TYPE_DATA_GENS} and call that, similar to {@link #nextUUID} Review Comment: still todo? ########## test/unit/org/apache/cassandra/utils/RandomHelpers.java: ########## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.util.Random; +import java.util.UUID; + +import com.google.common.base.Preconditions; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.commons.math3.random.MersenneTwister; +import org.apache.commons.math3.random.RandomGenerator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.quicktheories.core.DetatchedRandomnessSource; +import org.quicktheories.core.RandomnessSource; +import org.quicktheories.impl.Constraint; + +import static org.apache.cassandra.config.CassandraRelevantEnv.USER_SEED; + +/** + * Provides a more uniform generator w/a longer period, replacability, and some convenience methods around seeds for reproducibility. + */ +@SuppressWarnings("JavadocReference") +public final class RandomHelpers +{ + private static final Logger logger = LoggerFactory.getLogger(RandomHelpers.class); + private static long seed = Clock.Global.currentTimeMillis(); + private static RandomGenerator rg = new MersenneTwister(seed); + private static final SimpleRandomSource srs = new SimpleRandomSource(seed); + + @SuppressWarnings("unused") + public static void replaceRandomGenerator(RandomGenerator newGen) + { + rg = newGen; + } + + /** + * Will set the seed to the user defined value in env var {@link #USER_SEED} if set, otherwise current time in millis + */ + public static void setSeed() + { + long newSeed = USER_SEED.getString() != null + ? Long.parseLong(USER_SEED.getString()) + : Clock.Global.currentTimeMillis(); + setSeed(newSeed); + } + + public static void setSeed(long newSeed) + { + rg.setSeed(newSeed); + srs.setSeed(newSeed); + } + + public static void printSeed(String context) + { + logger.info("Random seed for context: " + context + ": " + seed); + } + + public static long getSeed() + { + return seed; + } + + public static String makeRandomStringBounded(int maxLength) + { + return makeRandomString(nextInt(maxLength)); + } + + // Creates a random string + public static String makeRandomString(int length) + { + char[] chars = new char[length]; + for (int i = 0; i < length; ++i) + chars[i++] = (char) ('a' + rg.nextInt('z' - 'a' + 1)); + return new String(chars); + } + + public static long nextLong() + { + return rg.nextLong(); + } + + public static int nextInt() + { + return rg.nextInt(); + } + + public static int nextInt(int bound) + { + return rg.nextInt(bound); + } + + public static void nextBytes(byte[] buffer) + { + rg.nextBytes(buffer); + } + + public static UUID nextUUID() + { + return Generators.UUID_RANDOM_GEN.generate(srs); + } + + /** + * TODO: Add {@code Gen<BigInteger>} support into {@link AbstractTypeGenerators#PRIMITIVE_TYPE_DATA_GENS} and call that, similar to {@link #nextUUID} + * @param bounds Max number of bytes for the BigInteger + */ + public static BigInteger nextBoundedBigInteger(int bounds) + { + return nextBigInteger(nextInt(bounds)); + } + + public static BigInteger nextBigInteger(int numBytes) + { + Preconditions.checkArgument(numBytes >= 0); + if (numBytes == 0) + return BigInteger.ZERO; + + ByteBuffer bb = ByteBuffer.wrap(new byte[numBytes]); + for (int i = 0; i < numBytes / 4; i++) + bb.putInt(nextInt()); Review Comment: `org.apache.cassandra.index.sai.SAITester.Randomization#nextBigInteger(int)` is interesting, passing the random source into `BigInteger` to generate. It doesn't look like `nextBoundedBigInteger` that calls this is used. ########## src/java/org/apache/cassandra/service/paxos/v1/PrepareCallback.java: ########## @@ -40,30 +41,43 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse> { private static final Logger logger = LoggerFactory.getLogger(PrepareCallback.class); - public boolean promised = true; - public Commit mostRecentCommit; - public Commit mostRecentInProgressCommit; + private boolean promised = true; + private Commit mostRecentCommit; + private Commit mostRecentInProgressCommit; private final Map<InetAddressAndPort, Commit> commitsByReplica = new ConcurrentHashMap<>(); - public PrepareCallback(DecoratedKey key, TableMetadata metadata, int targets, ConsistencyLevel consistency, Dispatcher.RequestTime requestTime) + + public PrepareCallback(DecoratedKey key, TableMetadata metadata, ReplicaPlan.ForPaxosWrite replicaPlan, Dispatcher.RequestTime requestTime) { - super(targets, consistency, requestTime); + super(replicaPlan.contacts().endpoints(), replicaPlan.requiredParticipants(), replicaPlan.consistencyLevel(), requestTime); // need to inject the right key in the empty commit so comparing with empty commits in the response works as expected mostRecentCommit = Commit.emptyCommit(key, metadata); mostRecentInProgressCommit = Commit.emptyCommit(key, metadata); } - public synchronized void onResponse(Message<PrepareResponse> message) + public boolean isPromised() { return promised; } + public Commit getMostRecentCommit() { return mostRecentCommit; } + public Commit getMostRecentInProgressCommit() { return mostRecentInProgressCommit; } + + public synchronized void onResponse(Message<PrepareResponse> msg) { - PrepareResponse response = message.payload; - logger.trace("Prepare response {} from {}", response, message.from()); + Preconditions.checkNotNull(msg, "Got unexpected null message in onResponse callback."); + PrepareResponse response = msg.payload; + tracker.recordResponse(msg.from()); + logger.trace("Prepare response {} from {}", response, msg.from()); + + // We don't want to flip the promise state if we've already succeeded or failed on this callback; if we've hit + // a quorum promised subsequent failures shouldn't be considered in our calculation. + if (latch.count() == 0) + return; // We set the mostRecentInProgressCommit even if we're not promised as, in that case, the ballot of that commit // will be used to avoid generating a ballot that has not chance to win on retry (think clock skew). if (response.inProgressCommit.isAfter(mostRecentInProgressCommit)) mostRecentInProgressCommit = response.inProgressCommit; + // Immediately bail out if any participant declines if (!response.promised) { promised = false; Review Comment: should this be DRYed up with `signal()`? ########## src/java/org/apache/cassandra/service/WriteResponseHandler.java: ########## @@ -17,58 +17,375 @@ */ package org.apache.cassandra.service; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; +import javax.annotation.Nullable; -import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.locator.ReplicaPlan; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.net.Message; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.IMutation; +import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.WriteType; -import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.exceptions.WriteFailureException; +import org.apache.cassandra.exceptions.WriteTimeoutException; +import org.apache.cassandra.locator.EndpointsForToken; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.ReplicaPlan; +import org.apache.cassandra.locator.ReplicaPlan.ForWrite; +import org.apache.cassandra.net.CallbackResponseTracker; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.RequestCallback; +import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.transport.Dispatcher; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.concurrent.Condition; +import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; -/** - * Handles blocking writes for ONE, ANY, TWO, THREE, QUORUM, and ALL consistency levels. - */ -public class WriteResponseHandler<T> extends AbstractWriteResponseHandler<T> +import static java.lang.Long.MAX_VALUE; +import static java.lang.Math.min; +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static org.apache.cassandra.config.DatabaseDescriptor.getCounterWriteRpcTimeout; +import static org.apache.cassandra.config.DatabaseDescriptor.getWriteRpcTimeout; +import static org.apache.cassandra.db.WriteType.COUNTER; +import static org.apache.cassandra.locator.Replicas.countInOurDc; +import static org.apache.cassandra.schema.Schema.instance; +import static org.apache.cassandra.service.StorageProxy.WritePerformer; +import static org.apache.cassandra.utils.Clock.Global.nanoTime; +import static org.apache.cassandra.utils.concurrent.Condition.newOneTimeCondition; + +@SuppressWarnings({ "rawtypes", "unchecked" }) +public class WriteResponseHandler<T> implements RequestCallback<T> { protected static final Logger logger = LoggerFactory.getLogger(WriteResponseHandler.class); - protected volatile int responses; - private static final AtomicIntegerFieldUpdater<WriteResponseHandler> responsesUpdater - = AtomicIntegerFieldUpdater.newUpdater(WriteResponseHandler.class, "responses"); + private final Condition condition = newOneTimeCondition(); + protected final ReplicaPlan.ForWrite replicaPlan; + + protected final Runnable callback; + protected final WriteType writeType; - public WriteResponseHandler(ReplicaPlan.ForWrite replicaPlan, - Runnable callback, - WriteType writeType, - Supplier<Mutation> hintOnFailure, - Dispatcher.RequestTime requestTime) + private static final AtomicIntegerFieldUpdater<WriteResponseHandler> failuresUpdater = + AtomicIntegerFieldUpdater.newUpdater(WriteResponseHandler.class, "failureCount"); + private volatile int failureCount = 0; + + private final AtomicBoolean recordedMetric = new AtomicBoolean(false); + + protected final Dispatcher.RequestTime requestTime; + protected final CallbackResponseTracker tracker; + private @Nullable final Supplier<Mutation> hintOnFailure; + + /** + * The count of all the replicas we'd need to hit across DC's to qualify as hitting our "ideal" + * + * Of note, this isn't a calculation of what would qualify as a quorum across all DC's, i.e. "EACH_QUORUM", but rather + * a brute force calculation of what would equate to CL_ALL. This is how we originally calculated things on implementation + * in CASSANDRA-13289 so we continue with that. It would be relatively trivial to extend this logic to include + * a per-DC calculation similar to how we track different DC responses in {@link EachQuorumResponseHandler#pendingResponsesPerDC} + */ + @SuppressWarnings("JavadocReference") + protected final int idealCLReplicaCount; + + public WriteResponseHandler(ForWrite replicaPlan, + WriteType writeType, + Supplier<Mutation> hintOnFailure, + Dispatcher.RequestTime requestTime) { - super(replicaPlan, callback, writeType, hintOnFailure, requestTime); - responses = blockFor(); + this(replicaPlan, null, writeType, hintOnFailure, requestTime); } - public WriteResponseHandler(ReplicaPlan.ForWrite replicaPlan, WriteType writeType, Supplier<Mutation> hintOnFailure, Dispatcher.RequestTime requestTime) + /** + * @param callback A callback to be called when the write is successful. + * @param hintOnFailure Enable/disable hinting on write failure + * @param requestTime Initial request time of the mutation to be used for timeouts and backpressure calculation + */ + public WriteResponseHandler(ForWrite replicaPlan, + Runnable callback, + WriteType writeType, + Supplier<Mutation> hintOnFailure, + Dispatcher.RequestTime requestTime) { - this(replicaPlan, null, writeType, hintOnFailure, requestTime); + this.replicaPlan = replicaPlan; + this.callback = callback; + this.writeType = writeType; + this.hintOnFailure = hintOnFailure; + this.tracker = new CallbackResponseTracker(replicaPlan.contacts().endpoints(), WriteResponseHandler.blockFor(replicaPlan)); + this.requestTime = requestTime; + this.idealCLReplicaCount = replicaPlan.contacts().size(); } - public void onResponse(Message<T> m) + /** + * Intended for use in DC-aware CL's, we'll track the DC responses in the response tracker. + */ + public void trackIdealCL(ReplicaPlan.ForWrite replicaPlan) { - replicaPlan.collectSuccess(m == null ? FBUtilities.getBroadcastAddressAndPort() : m.from()); - if (responsesUpdater.decrementAndGet(this) == 0) + tracker.enableIdealCLTracking(replicaPlan); + } + + public void get() throws WriteTimeoutException, WriteFailureException + { + long timeoutNanos = currentTimeoutNanos(); + + boolean signaled; + Map<InetAddressAndPort, RequestFailureReason> failuresByEndpoint; + try + { + signaled = condition.await(timeoutNanos, NANOSECONDS); + failuresByEndpoint = tracker.endProcessing(); + } + catch (InterruptedException e) + { + throw new UncheckedInterruptedException(e); + } + + if (signaled && receivedSufficientResponses()) + { + replicaPlan.checkStillAppliesTo(ClusterMetadata.current()); + return; + } + + if (!signaled) + throwTimeout(failuresByEndpoint); + + if (!receivedSufficientResponses()) + { + // We only want to consider replicas that count towards our blockFor() / requiredResponses threshold here to flag as a timeout + if (RequestCallback.isTimeout(failuresByEndpoint.keySet().stream() + .filter(this::waitingFor) + .collect(Collectors.toMap(Function.identity(), failuresByEndpoint::get)))) + throwTimeout(failuresByEndpoint); + throw new WriteFailureException(writeType, replicaPlan.consistencyLevel(), tracker.responseCount(), blockFor(), failuresByEndpoint); + } + + replicaPlan.checkStillAppliesTo(ClusterMetadata.current()); + } + + private void throwTimeout(Map<InetAddressAndPort, RequestFailureReason> failures) + { + int blockedFor = blockFor(); + int acks = tracker.responseCount(); + // It's pretty unlikely, but we can race between exiting await above and here, so + // that we could now have enough acks. In that case, we "lie" on the acks count to + // avoid sending confusing info to the user (see CASSANDRA-6491). + if (acks >= blockedFor) + acks = blockedFor - 1; + throw WriteTimeoutException.withParticipants(writeType, replicaPlan.consistencyLevel(), acks, blockedFor, failures); + } + + public final long currentTimeoutNanos() + { + long now = nanoTime(); + long requestTimeout = writeType == COUNTER + ? getCounterWriteRpcTimeout(NANOSECONDS) + : getWriteRpcTimeout(NANOSECONDS); + return requestTime.computeTimeout(now, requestTimeout); + } + + /** + * @return the minimum number of endpoints that must respond. + */ + @VisibleForTesting + public int blockFor() + { + // During bootstrap, we have to include the pending endpoints or we may fail the consistency level + // guarantees (see #833) + return blockFor(replicaPlan); + } + + public static int blockFor(ForWrite replicaPlan) + { + return replicaPlan.writeQuorum(); + } + + /** + * TODO: this method is brittle for its purpose of deciding when we should fail a query; + * this needs to be aware of which nodes are live/down Review Comment: is this TODO carried through in the refactor or should there be a JIRA here that tracks the work? ########## test/unit/org/apache/cassandra/utils/RandomHelpers.java: ########## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.util.Random; +import java.util.UUID; + +import com.google.common.base.Preconditions; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.commons.math3.random.MersenneTwister; +import org.apache.commons.math3.random.RandomGenerator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.quicktheories.core.DetatchedRandomnessSource; +import org.quicktheories.core.RandomnessSource; +import org.quicktheories.impl.Constraint; + +import static org.apache.cassandra.config.CassandraRelevantEnv.USER_SEED; + +/** + * Provides a more uniform generator w/a longer period, replacability, and some convenience methods around seeds for reproducibility. + */ +@SuppressWarnings("JavadocReference") +public final class RandomHelpers +{ + private static final Logger logger = LoggerFactory.getLogger(RandomHelpers.class); + private static long seed = Clock.Global.currentTimeMillis(); + private static RandomGenerator rg = new MersenneTwister(seed); + private static final SimpleRandomSource srs = new SimpleRandomSource(seed); + + @SuppressWarnings("unused") + public static void replaceRandomGenerator(RandomGenerator newGen) + { + rg = newGen; + } + + /** + * Will set the seed to the user defined value in env var {@link #USER_SEED} if set, otherwise current time in millis + */ + public static void setSeed() + { + long newSeed = USER_SEED.getString() != null + ? Long.parseLong(USER_SEED.getString()) + : Clock.Global.currentTimeMillis(); + setSeed(newSeed); + } + + public static void setSeed(long newSeed) + { + rg.setSeed(newSeed); + srs.setSeed(newSeed); + } + + public static void printSeed(String context) + { + logger.info("Random seed for context: " + context + ": " + seed); + } + + public static long getSeed() + { + return seed; + } + + public static String makeRandomStringBounded(int maxLength) + { + return makeRandomString(nextInt(maxLength)); + } + + // Creates a random string + public static String makeRandomString(int length) + { + char[] chars = new char[length]; + for (int i = 0; i < length; ++i) + chars[i++] = (char) ('a' + rg.nextInt('z' - 'a' + 1)); + return new String(chars); + } + + public static long nextLong() + { + return rg.nextLong(); + } + + public static int nextInt() + { + return rg.nextInt(); + } + + public static int nextInt(int bound) + { + return rg.nextInt(bound); + } + + public static void nextBytes(byte[] buffer) + { + rg.nextBytes(buffer); + } + + public static UUID nextUUID() + { + return Generators.UUID_RANDOM_GEN.generate(srs); + } + + /** + * TODO: Add {@code Gen<BigInteger>} support into {@link AbstractTypeGenerators#PRIMITIVE_TYPE_DATA_GENS} and call that, similar to {@link #nextUUID} Review Comment: still todo? ########## src/java/org/apache/cassandra/batchlog/BatchlogManager.java: ########## @@ -548,18 +548,25 @@ private static class ReplayWriteResponseHandler<T> extends WriteResponseHandler< } @Override - protected int blockFor() + public int blockFor() { return this.replicaPlan.contacts().size(); } @Override public void onResponse(Message<T> m) { + // tracker update handled in super boolean removed = undelivered.remove(m == null ? FBUtilities.getBroadcastAddressAndPort() : m.from()); assert removed; super.onResponse(m); } + + @Override + public boolean receivedSufficientResponses() + { + return super.receivedSufficientResponses(); + } Review Comment: Why override? For debugging? ########## test/unit/org/apache/cassandra/net/CallbackPropertyTest.java: ########## @@ -0,0 +1,750 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.net; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.ServerTestUtils; +import org.apache.cassandra.Util; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.EmptyIterators; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.ReadResponse; +import org.apache.cassandra.db.ReadResponseTest; +import org.apache.cassandra.db.SinglePartitionReadCommand; +import org.apache.cassandra.db.TruncateResponse; +import org.apache.cassandra.db.WriteType; +import org.apache.cassandra.db.marshal.AsciiType; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper; +import org.apache.cassandra.exceptions.ReadTimeoutException; +import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.exceptions.TruncateException; +import org.apache.cassandra.exceptions.WriteTimeoutException; +import org.apache.cassandra.locator.EndpointsForToken; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.ReplicaPlan; +import org.apache.cassandra.locator.ReplicaPlans; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.EachQuorumResponseHandler; +import org.apache.cassandra.service.LocalQuorumResponseHandler; +import org.apache.cassandra.service.TruncateResponseHandler; +import org.apache.cassandra.service.WriteResponseHandler; +import org.apache.cassandra.service.paxos.Ballot; +import org.apache.cassandra.service.paxos.Commit; +import org.apache.cassandra.service.paxos.PrepareResponse; +import org.apache.cassandra.service.paxos.v1.PrepareCallback; +import org.apache.cassandra.service.paxos.v1.ProposeCallback; +import org.apache.cassandra.service.reads.DataResolver; +import org.apache.cassandra.service.reads.ReadCallback; +import org.apache.cassandra.service.reads.repair.ReadRepair; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.transport.Dispatcher; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.Clock; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.RandomHelpers; +import org.quicktheories.generators.SourceDSL; + +import static org.apache.cassandra.Util.token; +import static org.apache.cassandra.locator.ReplicaUtils.EP1; +import static org.apache.cassandra.locator.ReplicaUtils.EP2; +import static org.apache.cassandra.locator.ReplicaUtils.EP3; +import static org.apache.cassandra.locator.ReplicaUtils.EP4; +import static org.apache.cassandra.locator.ReplicaUtils.EP5; +import static org.apache.cassandra.locator.ReplicaUtils.EP6; +import static org.apache.cassandra.locator.ReplicaUtils.EP7; +import static org.apache.cassandra.locator.ReplicaUtils.EP8; +import static org.apache.cassandra.locator.ReplicaUtils.EP9; +import static org.apache.cassandra.locator.ReplicaUtils.EP10; +import static org.apache.cassandra.locator.ReplicaUtils.UNIQUE_EP; +import static org.apache.cassandra.locator.ReplicaUtils.full; +import static org.quicktheories.QuickTheory.qt; + +@SuppressWarnings({ "unchecked", "rawtypes", "ZeroLengthArrayAllocation" }) +public class CallbackPropertyTest +{ + private static final Logger logger = LoggerFactory.getLogger(CallbackPropertyTest.class); + + private enum CallbackType + { + NORMAL, + PAXOS, + TRUNCATE, + EACH_QUORUM, + LOCAL_QUORUM + } + + /** Enable to print out details of the inferred test configuration setup **/ + private static boolean DEBUG = false; + + public static final String CF_STANDARD = "Standard1"; + + public static Keyspace ks; + public static ColumnFamilyStore cfs; + public static TableMetadata cfm; + + private static final String DC1 = "datacenter1"; + private static final String DC2 = "datacenter2"; + + /** By default, we'll test the CL's listed here unless the runner otherwise constrains things */ + private EnumSet<ConsistencyLevel> testedCL = EnumSet.of(ConsistencyLevel.ALL, ConsistencyLevel.QUORUM, ConsistencyLevel.ONE); + + public static DecoratedKey dk; + + private SinglePartitionReadCommand command; + + private ReplicaPlan.Shared readReplicaPlan; + private ReplicaPlan.ForWrite writeReplicaPlan; + private ReplicaPlan.ForPaxosWrite paxosReplicaPlan; + private ReplicaPlan.ForWrite eachQuorumPlan; + private ReplicaPlan.ForWrite localQuorumPlan; + + private DataResolver resolver; + private Dispatcher.RequestTime requestTime; + + private int dc1NodesDown; + private int dc2NodesDown = 0; + private int rf; + private ConsistencyLevel cl; + private int quorum; + + /** We calculate whether we expect a test to succeed in the runner to then compare against from inside the test case's context */ + private boolean caseExpectsException; + + private Runnable testRunnable; + + @BeforeClass + public static void setupClass() + { + DatabaseDescriptor.daemonInitialization(); + DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance); + + // No need to wait so long + DatabaseDescriptor.setReadRpcTimeout(100); + DatabaseDescriptor.setWriteRpcTimeout(100); + + ServerTestUtils.prepareServerNoRegister(); + + ClusterMetadataTestHelper.register(EP1, DC1, "R1"); + ClusterMetadataTestHelper.register(EP2, DC1, "R1"); + ClusterMetadataTestHelper.register(EP3, DC1, "R1"); + ClusterMetadataTestHelper.register(EP4, DC1, "R1"); + ClusterMetadataTestHelper.register(EP5, DC1, "R1"); + ClusterMetadataTestHelper.register(EP6, DC2, "R2"); + ClusterMetadataTestHelper.register(EP7, DC2, "R2"); + ClusterMetadataTestHelper.register(EP8, DC2, "R2"); + ClusterMetadataTestHelper.register(EP9, DC2, "R2"); + ClusterMetadataTestHelper.register(EP10, DC2, "R2"); + // We need to add endpoint's based on 0-4 == datacenter1, 5-9 == datacenter2. + for (int i = 0; i < 5; i++) + { + InetAddressAndPort peer1 = UNIQUE_EP.get(i); + InetAddressAndPort peer2 = UNIQUE_EP.get(i + 5); + + Token t1 = token(i); + Token t2 = token(i + 5); + + ClusterMetadataTestHelper.join(peer1, t1); + ClusterMetadataTestHelper.join(peer2, t2); + } + } + + private void runTestMatrix(CallbackType type, Runnable r) + { + int minRF = 1; + testRunnable = r; + + // Based on the type of callback we're testing we'll constrain various properties + switch (type) + { + case PAXOS: + minRF = 2; + testedCL = EnumSet.of(ConsistencyLevel.SERIAL); + break; + case TRUNCATE: + testedCL = EnumSet.of(ConsistencyLevel.ALL); + cl = ConsistencyLevel.ALL; + break; + case EACH_QUORUM: + cl = ConsistencyLevel.EACH_QUORUM; + break; + case LOCAL_QUORUM: + cl = ConsistencyLevel.LOCAL_QUORUM; + break; + case NORMAL: + default: + // pass -> respect CL and test them all + } + + if (type == CallbackType.EACH_QUORUM || type == CallbackType.LOCAL_QUORUM) + { + qt().forAll( + SourceDSL.integers().between(minRF, 5).describedAs(rf -> "rf: " + rf), + SourceDSL.integers().between(0, 5).describedAs(nd1 -> "DC1NodesDown: " + nd1), + SourceDSL.integers().between(0, 5).describedAs(nd2 -> "DC2NodesDown: " + nd2) + ) + .checkAssert(this::runMultiDCTest); + } + else + { + qt().forAll( + SourceDSL.integers().between(minRF, 10).describedAs(rf -> "rf: " + rf), + SourceDSL.arbitrary().pick(testedCL.toArray(new ConsistencyLevel[0])).describedAs(cl -> "cl: " + cl), + SourceDSL.integers().between(0, 10).describedAs(nd -> "nodesDown: " + dc1NodesDown) + ) + .checkAssert(this::runSingleDCTest); + } + } + + private void runSingleDCTest(int rf, ConsistencyLevel cl, int nodesDown) + { + if (nodesDown > rf) return; // Invalid scenario, skip + + this.rf = rf; + this.cl = cl; + this.dc1NodesDown = nodesDown; + + String ksName = "RequestCallbackTest" + rf + '_' + cl + '_' + nodesDown + System.nanoTime(); + TableMetadata.Builder builder = TableMetadata.builder(ksName, CF_STANDARD) + .addPartitionKeyColumn("key", BytesType.instance) + .addClusteringColumn("col1", AsciiType.instance) + .addRegularColumn("c1", AsciiType.instance) + .addRegularColumn("c2", AsciiType.instance) + .addRegularColumn("one", AsciiType.instance) + .addRegularColumn("two", AsciiType.instance); + + SchemaLoader.createKeyspace(ksName, KeyspaceParams.simple(rf), builder); + + this.quorum = ReplicaPlans.calculateQuorumMajority(rf); + if (cl == ConsistencyLevel.ALL) this.quorum = rf; + if (cl == ConsistencyLevel.ONE) this.quorum = 1; + + this.caseExpectsException = rf - nodesDown < quorum; + + logger.info("RUNNING TEST CASE: rf: {}. cl: {}. nodesDown: {}. quorum: {}. expectedException: {}", rf, cl, nodesDown, quorum, caseExpectsException); + + ks = Keyspace.open(ksName); + cfs = ks.getColumnFamilyStore(CF_STANDARD); + cfm = cfs.metadata(); + dk = Util.dk("key1"); + Token t = Murmur3Partitioner.instance.getToken(dk.getKey()); + command = SinglePartitionReadCommand.fullPartitionRead(cfm, FBUtilities.nowInSeconds(), dk); + + List<Replica> replicas = new ArrayList<>(); + for (int i = 0; i < rf; i++) + replicas.add(full(UNIQUE_EP.get(i))); + + EndpointsForToken targetReplicas = EndpointsForToken.of(dk.getToken(), replicas.toArray(new Replica[0])); + EndpointsForToken emptyReplicas = EndpointsForToken.empty(dk.getToken()); + + // Since all callbacks excepting multi-DC cl are handled by this test runner, we init replica plans we may not use here. /shrug + ReplicaPlan.ForTokenRead plan = ReplicaPlans.forRead(ks, dk.getToken(), null, cl, cfs.metadata().params.speculativeRetry); + readReplicaPlan = ReplicaPlan.shared(new ReplicaPlan.ForTokenRead(ks, + ks.getReplicationStrategy(), + cl, + targetReplicas, + targetReplicas, + (newClusterMetadata) -> plan, + (self) -> { throw new IllegalStateException("Read repair is not supported for these tests"); }, + ClusterMetadata.current().epoch) + ); + + Function<ClusterMetadata, ReplicaPlan.ForWrite> writeRecompute = (newClusterMetadata) -> ReplicaPlans.forWrite(ks, cl, t, ReplicaPlans.writeNormal); + writeReplicaPlan = new ReplicaPlan.ForWrite(ks, ks.getReplicationStrategy(), cl, emptyReplicas, targetReplicas, emptyReplicas, targetReplicas, writeRecompute, ClusterMetadata.current().epoch); + + int paxosRequired = ReplicaPlans.calculateQuorumMajority(targetReplicas.size()); + Function<ClusterMetadata, ReplicaPlan.ForWrite> paxosRecompute = (newClusterMetadata) -> ReplicaPlans.forPaxos(ks, dk, cl); + paxosReplicaPlan = ReplicaPlan.ForPaxosWrite.forTest(ks, cl, emptyReplicas, targetReplicas, emptyReplicas, targetReplicas, paxosRequired, paxosRecompute, ClusterMetadata.current().epoch); + + ReadRepair readRepair = ReadRepair.create(command, readReplicaPlan, requestTime); + resolver = new DataResolver(command, readReplicaPlan, readRepair, requestTime); + + requestTime = new Dispatcher.RequestTime(Clock.Global.nanoTime()); + + testRunnable.run(); + } + + private void runMultiDCTest(int rf, int dc1NodesDown, int dc2NodesDown) + { + if (dc1NodesDown > rf) return; // invalid param arrangement; skip + if (dc2NodesDown > rf) return; // invalid param arrangement; skip + + this.rf = rf; + this.quorum = ReplicaPlans.calculateQuorumMajority(rf); + this.dc1NodesDown = dc1NodesDown; + this.dc2NodesDown = dc2NodesDown; + + // Only care about DC1 on local quorum check + caseExpectsException = cl == ConsistencyLevel.LOCAL_QUORUM + ? rf - dc1NodesDown < quorum + : rf - dc1NodesDown < quorum || rf - dc2NodesDown < quorum; + + String ksName = "RequestCallbackTest" + rf + '_' + cl + '_' + dc1NodesDown + '_' + dc2NodesDown; + TableMetadata.Builder builder = TableMetadata.builder(ksName, CF_STANDARD) + .addPartitionKeyColumn("key", BytesType.instance) + .addClusteringColumn("col1", AsciiType.instance) + .addRegularColumn("c1", AsciiType.instance) + .addRegularColumn("c2", AsciiType.instance) + .addRegularColumn("one", AsciiType.instance) + .addRegularColumn("two", AsciiType.instance); + + logger.info("RUNNING TEST CASE: rf: {}. cl: {}. dc1NodesDown: {}. dc2NodesDown: {}, quorum: {}. expectedException: {}", rf, cl, dc1NodesDown, dc2NodesDown, quorum, caseExpectsException); + KeyspaceParams params = KeyspaceParams.nts(DC1, rf, DC2, rf); + SchemaLoader.createKeyspace(ksName, params, builder); + + ks = Keyspace.open(ksName); + cfs = ks.getColumnFamilyStore(CF_STANDARD); + cfm = cfs.metadata(); + dk = Util.dk("key1"); + Token t = Murmur3Partitioner.instance.getToken(dk.getKey()); + command = SinglePartitionReadCommand.fullPartitionRead(cfm, FBUtilities.nowInSeconds(), dk); + + List<Replica> replicas = new ArrayList<>(); + for (int i = 0; i < rf; i++) + replicas.add(full(UNIQUE_EP.get(i))); + + EndpointsForToken targetReplicas = EndpointsForToken.of(dk.getToken(), replicas.toArray(new Replica[0])); + EndpointsForToken emptyReplicas = EndpointsForToken.empty(dk.getToken()); + + Function<ClusterMetadata, ReplicaPlan.ForWrite> paxRecompute = (newClusterMetadata) -> ReplicaPlans.forWrite(ks, cl, t, ReplicaPlans.writeNormal); + + eachQuorumPlan = new ReplicaPlan.ForWrite(ks, ks.getReplicationStrategy(), ConsistencyLevel.EACH_QUORUM, emptyReplicas, targetReplicas, emptyReplicas, targetReplicas, paxRecompute, ClusterMetadata.current().epoch); + debugLog("********************************** About to create LQP; check init below this"); + localQuorumPlan = new ReplicaPlan.ForWrite(ks, ks.getReplicationStrategy(), ConsistencyLevel.LOCAL_QUORUM, emptyReplicas, targetReplicas, emptyReplicas, targetReplicas, paxRecompute, ClusterMetadata.current().epoch); + + requestTime = new Dispatcher.RequestTime(Clock.Global.nanoTime()); + + testRunnable.run(); + } + + @Test + public void testRead() + { + runTestMatrix(CallbackType.NORMAL, () -> { + ReadCallback callback = new ReadCallback(resolver, command, readReplicaPlan, requestTime); + validateQuorum(callback.blockFor()); + try + { + for (int i : shuffleEndpointOrder(rf)) + { + InetAddressAndPort ep = UNIQUE_EP.get(i); + if (i >= dc1NodesDown) + { + ReadResponseTest.StubRepairedDataInfo rdi = new ReadResponseTest.StubRepairedDataInfo(ByteBufferUtil.EMPTY_BYTE_BUFFER, true); + ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(cfm), rdi); + callback.onResponse(Message.synthetic(ep, Verb.READ_RSP, response)); + } + else + callback.onFailure(ep, RequestFailureReason.TIMEOUT); + } + callback.awaitResults(); + } + catch (Exception e) + { + validateExceptionState(ReadTimeoutException.class, e); + return; + } + validateNoException(ReadTimeoutException.class); + }); + } + + @Test + public void testWrite() + { + runTestMatrix(CallbackType.NORMAL, () -> { + WriteResponseHandler callback = new WriteResponseHandler(writeReplicaPlan, WriteType.SIMPLE, null, requestTime); + validateQuorum(callback.blockFor()); + Assert.assertSame(String.format("Got unexpected blockFor / requiredResponses from cl: %s.", cl), quorum, callback.blockFor()); + try + { + for (int i : shuffleEndpointOrder(rf)) + { + InetAddressAndPort ep = UNIQUE_EP.get(i); + if (i >= dc1NodesDown) + callback.onResponse(Message.synthetic(ep, Verb.MUTATION_RSP, null)); + else + callback.onFailure(ep, RequestFailureReason.TIMEOUT); + } + callback.get(); + } + catch (Exception e) + { + validateExceptionState(WriteTimeoutException.class, e); + return; + } + validateNoException(WriteTimeoutException.class); + }); + } + + /** + * Effectively, anything that's a non-zero # of downed nodes can be expected to raise an exception on callback completion. + * Timeouts get swallowed into and converted to TruncateException's + */ + @Test + public void testTruncate() + { + runTestMatrix(CallbackType.TRUNCATE, () -> { + TruncateResponseHandler callback = new TruncateResponseHandler(writeReplicaPlan.contacts().endpoints()); + // Should always have blockFor == entire RF for Truncate + Assert.assertEquals(String.format("Got unexpected blockFor / requiredResponses from cl %s", cl), rf, callback.blockFor()); + try + { + for (int i : shuffleEndpointOrder(rf)) + { + InetAddressAndPort ep = UNIQUE_EP.get(i); + if (i >= dc1NodesDown) + callback.onResponse(Message.synthetic(ep, Verb.PAXOS_PROPOSE_RSP, new TruncateResponse(ks.getName(), CF_STANDARD, true))); + else + callback.onFailure(ep, RequestFailureReason.TIMEOUT); + } + callback.get(); + } + catch (Exception e) + { + validateExceptionState(TruncateException.class, e); + return; + } + validateNoException(TruncateException.class); + }); + } + + @Test + public void testEachQuorum() + { + runTestMatrix(CallbackType.EACH_QUORUM, () -> { + debugLog("---------------- CYCLE. rf: {}. nd1: {}. nd2: {}. quorum: {}", rf, dc1NodesDown, dc2NodesDown, quorum); + EachQuorumResponseHandler callback = new EachQuorumResponseHandler(eachQuorumPlan, null, WriteType.SIMPLE, null, requestTime); Review Comment: I ran the test with coverage on and noticed the ideal CL stuff wasn't being excercised, so I tried adding ``` callback.trackIdealCL(writeReplicaPlan.withConsistencyLevel(ConsistencyLevel.ALL)); ``` and found a counterexample ``` java.lang.AssertionError: Property falsified after 7 example(s) Smallest found falsifying value(s) :- {rf: 6, cl: ONE, nodesDown: 0} Cause was :- java.lang.AssertionError: Expected successful callback resolution but saw exception. at org.junit.Assert.fail(Assert.java:88) at org.apache.cassandra.net.CallbackPropertyTest.validateExceptionState(CallbackPropertyTest.java:773) ``` I'm assuming setting an ideal CL should never fail the request. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org