This is an automated email from the ASF dual-hosted git repository. adelapena pushed a commit to branch cassandra-3.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push: new e5c3d08 Operational improvements and hardening for replica filtering protection patch by Caleb Rackliffe; reviewed by Andrés de la Peña for CASSANDRA-15907 e5c3d08 is described below commit e5c3d08a1428d378b6690f0419a2b25724b9736e Author: Caleb Rackliffe <calebrackli...@gmail.com> AuthorDate: Mon Jun 29 13:54:23 2020 -0500 Operational improvements and hardening for replica filtering protection patch by Caleb Rackliffe; reviewed by Andrés de la Peña for CASSANDRA-15907 --- CHANGES.txt | 1 + build.xml | 2 +- conf/cassandra.yaml | 20 + src/java/org/apache/cassandra/config/Config.java | 2 + .../cassandra/config/DatabaseDescriptor.java | 20 + .../config/ReplicaFilteringProtectionOptions.java | 28 ++ .../db/partitions/PartitionIterators.java | 49 ++- .../partitions/UnfilteredPartitionIterators.java | 19 - .../apache/cassandra/db/rows/EncodingStats.java | 24 ++ .../org/apache/cassandra/metrics/TableMetrics.java | 22 +- .../org/apache/cassandra/service/DataResolver.java | 89 +++-- .../service/ReplicaFilteringProtection.java | 423 ++++++++++++--------- .../apache/cassandra/service/StorageService.java | 27 +- .../cassandra/service/StorageServiceMBean.java | 12 + .../org/apache/cassandra/utils/FBUtilities.java | 4 +- .../cassandra/utils/concurrent/Accumulator.java | 9 +- .../cassandra/distributed/impl/Coordinator.java | 10 + .../apache/cassandra/distributed/impl/RowUtil.java | 7 +- .../test/ReplicaFilteringProtectionTest.java | 244 ++++++++++++ .../cassandra/db/rows/EncodingStatsTest.java | 145 +++++++ .../utils/concurrent/AccumulatorTest.java | 34 +- 21 files changed, 924 insertions(+), 267 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index a755b7e..182dca3 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.22: + * Operational improvements and hardening for replica filtering protection (CASSANDRA-15907) * stop_paranoid disk failure policy is ignored on CorruptSSTableException after node is up (CASSANDRA-15191) * 3.x fails to start if commit log has range tombstones from a column which is also deleted (CASSANDRA-15970) * Forbid altering UDTs used in partition keys (CASSANDRA-15933) diff --git a/build.xml b/build.xml index 6492767..6c1d148 100644 --- a/build.xml +++ b/build.xml @@ -398,7 +398,7 @@ </dependency> <dependency groupId="junit" artifactId="junit" version="4.6" /> <dependency groupId="org.mockito" artifactId="mockito-core" version="3.2.4" /> - <dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.3" /> + <dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.4" /> <dependency groupId="org.apache.rat" artifactId="apache-rat" version="0.10"> <exclusion groupId="commons-lang" artifactId="commons-lang"/> </dependency> diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index c321a72..bb96f18 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -661,6 +661,26 @@ auto_snapshot: true tombstone_warn_threshold: 1000 tombstone_failure_threshold: 100000 +# Filtering and secondary index queries at read consistency levels above ONE/LOCAL_ONE use a +# mechanism called replica filtering protection to ensure that results from stale replicas do +# not violate consistency. (See CASSANDRA-8272 and CASSANDRA-15907 for more details.) This +# mechanism materializes replica results by partition on-heap at the coordinator. The more possibly +# stale results returned by the replicas, the more rows materialized during the query. +replica_filtering_protection: + # These thresholds exist to limit the damage severely out-of-date replicas can cause during these + # queries. They limit the number of rows from all replicas individual index and filtering queries + # can materialize on-heap to return correct results at the desired read consistency level. + # + # "cached_replica_rows_warn_threshold" is the per-query threshold at which a warning will be logged. + # "cached_replica_rows_fail_threshold" is the per-query threshold at which the query will fail. + # + # These thresholds may also be adjusted at runtime using the StorageService mbean. + # + # If the failure threshold is breached, it is likely that either the current page/fetch size + # is too large or one or more replicas is severely out-of-sync and in need of repair. + cached_rows_warn_threshold: 2000 + cached_rows_fail_threshold: 32000 + # Granularity of the collation index of rows within a partition. # Increase if your rows are large, or if you have a very large # number of rows per partition. The competing goals are these: diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 6003bd1..2218ee2 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -280,6 +280,8 @@ public class Config public volatile int tombstone_warn_threshold = 1000; public volatile int tombstone_failure_threshold = 100000; + public final ReplicaFilteringProtectionOptions replica_filtering_protection = new ReplicaFilteringProtectionOptions(); + public volatile Long index_summary_capacity_in_mb; public volatile int index_summary_resize_interval_in_minutes = 60; diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 4b732c2..9369229 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1368,6 +1368,26 @@ public class DatabaseDescriptor conf.tombstone_failure_threshold = threshold; } + public static int getCachedReplicaRowsWarnThreshold() + { + return conf.replica_filtering_protection.cached_rows_warn_threshold; + } + + public static void setCachedReplicaRowsWarnThreshold(int threshold) + { + conf.replica_filtering_protection.cached_rows_warn_threshold = threshold; + } + + public static int getCachedReplicaRowsFailThreshold() + { + return conf.replica_filtering_protection.cached_rows_fail_threshold; + } + + public static void setCachedReplicaRowsFailThreshold(int threshold) + { + conf.replica_filtering_protection.cached_rows_fail_threshold = threshold; + } + /** * size of commitlog segments to allocate */ diff --git a/src/java/org/apache/cassandra/config/ReplicaFilteringProtectionOptions.java b/src/java/org/apache/cassandra/config/ReplicaFilteringProtectionOptions.java new file mode 100644 index 0000000..7a755ab --- /dev/null +++ b/src/java/org/apache/cassandra/config/ReplicaFilteringProtectionOptions.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.config; + +public class ReplicaFilteringProtectionOptions +{ + public static final int DEFAULT_WARN_THRESHOLD = 2000; + public static final int DEFAULT_FAIL_THRESHOLD = 32000; + + public volatile int cached_rows_warn_threshold = DEFAULT_WARN_THRESHOLD; + public volatile int cached_rows_fail_threshold = DEFAULT_FAIL_THRESHOLD; +} diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java index a3cf746..c7f20c5 100644 --- a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java +++ b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java @@ -18,10 +18,8 @@ package org.apache.cassandra.db.partitions; import java.util.*; -import java.security.MessageDigest; import org.apache.cassandra.db.EmptyIterators; -import org.apache.cassandra.db.transform.FilteredPartitions; import org.apache.cassandra.db.transform.MorePartitions; import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.utils.AbstractIterator; @@ -98,6 +96,21 @@ public abstract class PartitionIterators } /** + * Consumes all rows in the next partition of the provided partition iterator. + */ + public static void consumeNext(PartitionIterator iterator) + { + if (iterator.hasNext()) + { + try (RowIterator partition = iterator.next()) + { + while (partition.hasNext()) + partition.next(); + } + } + } + + /** * Wraps the provided iterator so it logs the returned rows for debugging purposes. * <p> * Note that this is only meant for debugging as this can log a very large amount of @@ -116,6 +129,38 @@ public abstract class PartitionIterators return Transformation.apply(iterator, new Logger()); } + /** + * Wraps the provided iterator to run a specified action on close. Note that the action will be + * run even if closure of the provided iterator throws an exception. + */ + public static PartitionIterator doOnClose(PartitionIterator delegate, Runnable action) + { + return new PartitionIterator() + { + public void close() + { + try + { + delegate.close(); + } + finally + { + action.run(); + } + } + + public boolean hasNext() + { + return delegate.hasNext(); + } + + public RowIterator next() + { + return delegate.next(); + } + }; + } + private static class SingletonPartitionIterator extends AbstractIterator<RowIterator> implements PartitionIterator { private final RowIterator iterator; diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java index bff910e..4af53e2 100644 --- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java +++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java @@ -27,7 +27,6 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.transform.FilteredPartitions; -import org.apache.cassandra.db.transform.MorePartitions; import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; @@ -78,24 +77,6 @@ public abstract class UnfilteredPartitionIterators return Transformation.apply(toReturn, new Close()); } - public static UnfilteredPartitionIterator concat(final List<UnfilteredPartitionIterator> iterators) - { - if (iterators.size() == 1) - return iterators.get(0); - - class Extend implements MorePartitions<UnfilteredPartitionIterator> - { - int i = 1; - public UnfilteredPartitionIterator moreContents() - { - if (i >= iterators.size()) - return null; - return iterators.get(i++); - } - } - return MorePartitions.extend(iterators.get(0), new Extend()); - } - public static PartitionIterator filter(final UnfilteredPartitionIterator iterator, final int nowInSec) { return FilteredPartitions.filter(iterator, nowInSec); diff --git a/src/java/org/apache/cassandra/db/rows/EncodingStats.java b/src/java/org/apache/cassandra/db/rows/EncodingStats.java index 955ffc7..c1235e4 100644 --- a/src/java/org/apache/cassandra/db/rows/EncodingStats.java +++ b/src/java/org/apache/cassandra/db/rows/EncodingStats.java @@ -19,6 +19,7 @@ package org.apache.cassandra.db.rows; import java.io.IOException; import java.util.*; +import java.util.function.Function; import org.apache.cassandra.db.*; import org.apache.cassandra.db.partitions.PartitionStatisticsCollector; @@ -107,6 +108,29 @@ public class EncodingStats return new EncodingStats(minTimestamp, minDelTime, minTTL); } + /** + * Merge one or more EncodingStats, that are lazily materialized from some list of arbitrary type by the provided function + */ + public static <V, F extends Function<V, EncodingStats>> EncodingStats merge(List<V> values, F function) + { + if (values.size() == 1) + return function.apply(values.get(0)); + + Collector collector = new Collector(); + for (int i = 0, iSize = values.size(); i < iSize; i++) + { + V v = values.get(i); + EncodingStats stats = function.apply(v); + if (stats.minTimestamp != TIMESTAMP_EPOCH) + collector.updateTimestamp(stats.minTimestamp); + if (stats.minLocalDeletionTime != DELETION_TIME_EPOCH) + collector.updateLocalDeletionTime(stats.minLocalDeletionTime); + if (stats.minTTL != TTL_EPOCH) + collector.updateTTL(stats.minTTL); + } + return collector.get(); + } + @Override public boolean equals(Object o) { diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java index 1f4803e..a551a78 100644 --- a/src/java/org/apache/cassandra/metrics/TableMetrics.java +++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; import com.codahale.metrics.*; import com.codahale.metrics.Timer; import com.google.common.collect.Maps; + import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.Memtable; @@ -153,7 +154,16 @@ public class TableMetrics public final Meter readRepairRequests; public final Meter shortReadProtectionRequests; - public final Meter replicaSideFilteringProtectionRequests; + + public final Meter replicaFilteringProtectionRequests; + + /** + * This histogram records the maximum number of rows {@link org.apache.cassandra.service.ReplicaFilteringProtection} + * caches at a point in time per query. With no replica divergence, this is equivalent to the maximum number of + * cached rows in a single partition during a query. It can be helpful when choosing appropriate values for the + * replica_filtering_protection thresholds in cassandra.yaml. + */ + public final Histogram rfpRowsCachedPerQuery; public final Map<Sampler, TopKSampler<ByteBuffer>> samplers; /** @@ -652,7 +662,8 @@ public class TableMetrics readRepairRequests = createTableMeter("ReadRepairRequests"); shortReadProtectionRequests = createTableMeter("ShortReadProtectionRequests"); - replicaSideFilteringProtectionRequests = createTableMeter("ReplicaSideFilteringProtectionRequests"); + replicaFilteringProtectionRequests = createTableMeter("ReplicaFilteringProtectionRequests"); + rfpRowsCachedPerQuery = createHistogram("ReplicaFilteringProtectionRowsCachedPerQuery", true); } public void updateSSTableIterated(int count) @@ -771,6 +782,13 @@ public class TableMetrics register(name, alias, tableMeter); return tableMeter; } + + private Histogram createHistogram(String name, boolean considerZeroes) + { + Histogram histogram = Metrics.histogram(factory.createMetricName(name), aliasFactory.createMetricName(name), considerZeroes); + register(name, name, histogram); + return histogram; + } /** * Create a histogram-like interface that will register both a CF, keyspace and global level diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java index 02d355e..1d0bb47 100644 --- a/src/java/org/apache/cassandra/service/DataResolver.java +++ b/src/java/org/apache/cassandra/service/DataResolver.java @@ -49,7 +49,7 @@ public class DataResolver extends ResponseResolver Boolean.getBoolean("cassandra.drop_oversized_readrepair_mutations"); @VisibleForTesting - final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>()); + final List<AsyncOneResponse<?>> repairResults = Collections.synchronizedList(new ArrayList<>()); private final boolean enforceStrictLiveness; @@ -135,7 +135,7 @@ public class DataResolver extends ResponseResolver UnfilteredPartitionIterator originalResponse = responses.get(i).payload.makeIterator(command); return context.needShortReadProtection() - ? extendWithShortReadProtection(originalResponse, context.sources[i], context.mergedResultCounter) + ? extendWithShortReadProtection(originalResponse, context, i) : originalResponse; } @@ -150,20 +150,20 @@ public class DataResolver extends ResponseResolver { // Protecting against inconsistent replica filtering (some replica returning a row that is outdated but that // wouldn't be removed by normal reconciliation because up-to-date replica have filtered the up-to-date version - // of that row) works in 3 steps: - // 1) we read the full response just to collect rows that may be outdated (the ones we got from some - // replica but didn't got any response for other; it could be those other replica have filtered a more - // up-to-date result). In doing so, we do not count any of such "potentially outdated" row towards the - // query limit. This simulate the worst case scenario where all those "potentially outdated" rows are - // indeed outdated, and thus make sure we are guaranteed to read enough results (thanks to short read - // protection). - // 2) we query all the replica/rows we need to rule out whether those "potentially outdated" rows are outdated - // or not. - // 3) we re-read cached copies of each replica response using the "normal" read path merge with read-repair, - // but where for each replica we use their original response _plus_ the additional rows queried in the - // previous step (and apply the command#rowFilter() on the full result). Since the first phase has - // pessimistically collected enough results for the case where all potentially outdated results are indeed - // outdated, we shouldn't need further short-read protection requests during this phase. + // of that row) involves 3 main elements: + // 1) We combine short-read protection and a merge listener that identifies potentially "out-of-date" + // rows to create an iterator that is guaranteed to produce enough valid row results to satisfy the query + // limit if enough actually exist. A row is considered out-of-date if its merged from is non-empty and we + // receive not response from at least one replica. In this case, it is possible that filtering at the + // "silent" replica has produced a more up-to-date result. + // 2) This iterator is passed to the standard resolution process with read-repair, but is first wrapped in a + // response provider that lazily "completes" potentially out-of-date rows by directly querying them on the + // replicas that were previously silent. As this iterator is consumed, it caches valid data for potentially + // out-of-date rows, and this cached data is merged with the fetched data as rows are requested. If there + // is no replica divergence, only rows in the partition being evalutated will be cached (then released + // when the partition is consumed). + // 3) After a "complete" row is materialized, it must pass the row filter supplied by the original query + // before it counts against the limit. // We could get more responses while this method runs, which is ok (we're happy to ignore any response not here // at the beginning of this method), so grab the response count once and use that through the method. @@ -171,25 +171,25 @@ public class DataResolver extends ResponseResolver // We need separate contexts, as each context has his own counter ResolveContext firstPhaseContext = new ResolveContext(count); ResolveContext secondPhaseContext = new ResolveContext(count); - ReplicaFilteringProtection rfp = new ReplicaFilteringProtection(keyspace, command, consistency, firstPhaseContext.sources); + + ReplicaFilteringProtection rfp = new ReplicaFilteringProtection(keyspace, + command, + consistency, + firstPhaseContext.sources, + DatabaseDescriptor.getCachedReplicaRowsWarnThreshold(), + DatabaseDescriptor.getCachedReplicaRowsFailThreshold()); + PartitionIterator firstPhasePartitions = resolveInternal(firstPhaseContext, rfp.mergeController(), i -> shortReadProtectedResponse(i, firstPhaseContext), UnaryOperator.identity()); - - // Consume the first phase partitions to populate the replica filtering protection with both those materialized - // partitions and the primary keys to be fetched. - PartitionIterators.consume(firstPhasePartitions); - firstPhasePartitions.close(); - - // After reading the entire query results the protection helper should have cached all the partitions so we can - // clear the responses accumulator for the sake of memory usage, given that the second phase might take long if - // it needs to query replicas. - responses.clearUnsafe(); - - return resolveWithReadRepair(secondPhaseContext, - rfp::queryProtectedPartitions, - results -> command.rowFilter().filter(results, command.metadata(), command.nowInSec())); + + PartitionIterator completedPartitions = resolveWithReadRepair(secondPhaseContext, + i -> rfp.queryProtectedPartitions(firstPhasePartitions, i), + results -> command.rowFilter().filter(results, command.metadata(), command.nowInSec())); + + // Ensure that the RFP instance has a chance to record metrics when the iterator closes. + return PartitionIterators.doOnClose(completedPartitions, firstPhasePartitions::close); } private PartitionIterator resolveInternal(ResolveContext context, @@ -217,8 +217,8 @@ public class DataResolver extends ResponseResolver */ UnfilteredPartitionIterator merged = UnfilteredPartitionIterators.merge(results, command.nowInSec(), mergeListener); - FilteredPartitions filtered = - FilteredPartitions.filter(merged, new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness())); + Filter filter = new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness()); + FilteredPartitions filtered = FilteredPartitions.filter(merged, filter); PartitionIterator counted = Transformation.apply(preCountFilter.apply(filtered), context.mergedResultCounter); return command.isForThrift() @@ -598,14 +598,17 @@ public class DataResolver extends ResponseResolver } private UnfilteredPartitionIterator extendWithShortReadProtection(UnfilteredPartitionIterator partitions, - InetAddress source, - DataLimits.Counter mergedResultCounter) + ResolveContext context, + int i) { DataLimits.Counter singleResultCounter = command.limits().newCounter(command.nowInSec(), false, command.selectsFullPartition(), enforceStrictLiveness).onlyCount(); - ShortReadPartitionsProtection protection = - new ShortReadPartitionsProtection(source, singleResultCounter, mergedResultCounter); + // The pre-fetch callback used here makes the initial round of responses for this replica collectable. + ShortReadPartitionsProtection protection = new ShortReadPartitionsProtection(context.sources[i], + () -> responses.clearUnsafe(i), + singleResultCounter, + context.mergedResultCounter); /* * The order of extention and transformations is important here. Extending with more partitions has to happen @@ -642,6 +645,7 @@ public class DataResolver extends ResponseResolver private class ShortReadPartitionsProtection extends Transformation<UnfilteredRowIterator> implements MorePartitions<UnfilteredPartitionIterator> { private final InetAddress source; + private final Runnable preFetchCallback; // called immediately before fetching more contents private final DataLimits.Counter singleResultCounter; // unmerged per-source counter private final DataLimits.Counter mergedResultCounter; // merged end-result counter @@ -650,9 +654,13 @@ public class DataResolver extends ResponseResolver private boolean partitionsFetched; // whether we've seen any new partitions since iteration start or last moreContents() call - private ShortReadPartitionsProtection(InetAddress source, DataLimits.Counter singleResultCounter, DataLimits.Counter mergedResultCounter) + private ShortReadPartitionsProtection(InetAddress source, + Runnable preFetchCallback, + DataLimits.Counter singleResultCounter, + DataLimits.Counter mergedResultCounter) { this.source = source; + this.preFetchCallback = preFetchCallback; this.singleResultCounter = singleResultCounter; this.mergedResultCounter = mergedResultCounter; } @@ -723,6 +731,9 @@ public class DataResolver extends ResponseResolver ColumnFamilyStore.metricsFor(command.metadata().cfId).shortReadProtectionRequests.mark(); Tracing.trace("Requesting {} extra rows from {} for short read protection", toQuery, source); + // If we've arrived here, all responses have been consumed, and we're about to request more. + preFetchCallback.run(); + PartitionRangeReadCommand cmd = makeFetchAdditionalPartitionReadCommand(toQuery); return executeReadCommand(cmd); } @@ -742,7 +753,7 @@ public class DataResolver extends ResponseResolver return cmd.withUpdatedLimitsAndDataRange(newLimits, newDataRange); } - private class ShortReadRowsProtection extends Transformation implements MoreRows<UnfilteredRowIterator> + private class ShortReadRowsProtection extends Transformation<UnfilteredRowIterator> implements MoreRows<UnfilteredRowIterator> { private final CFMetaData metadata; private final DecoratedKey partitionKey; diff --git a/src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java b/src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java index 36d51cc..0a57e66 100644 --- a/src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java +++ b/src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java @@ -19,15 +19,15 @@ package org.apache.cassandra.service; import java.net.InetAddress; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.NavigableSet; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.stream.Collectors; +import java.util.concurrent.TimeUnit; +import java.util.Queue; +import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,6 +49,8 @@ import org.apache.cassandra.db.filter.ClusteringIndexFilter; import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter; import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.PartitionIterators; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; import org.apache.cassandra.db.rows.EncodingStats; @@ -58,11 +60,13 @@ import org.apache.cassandra.db.rows.Rows; import org.apache.cassandra.db.rows.Unfiltered; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.db.rows.UnfilteredRowIterators; +import org.apache.cassandra.exceptions.OverloadedException; import org.apache.cassandra.exceptions.ReadTimeoutException; import org.apache.cassandra.exceptions.UnavailableException; import org.apache.cassandra.metrics.TableMetrics; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.NoSpamLogger; import org.apache.cassandra.utils.btree.BTreeSet; /** @@ -74,11 +78,15 @@ import org.apache.cassandra.utils.btree.BTreeSet; * the rows in a replica response that don't have a corresponding row in other replica responses, and requests them by * primary key to the "silent" replicas in a second fetch round. * <p> - * See CASSANDRA-8272 and CASSANDRA-8273 for further details. + * See CASSANDRA-8272, CASSANDRA-8273, and CASSANDRA-15907 for further details. */ class ReplicaFilteringProtection { private static final Logger logger = LoggerFactory.getLogger(ReplicaFilteringProtection.class); + private static final NoSpamLogger oneMinuteLogger = NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES); + + private static final Function<UnfilteredRowIterator, EncodingStats> NULL_TO_NO_STATS = + rowIterator -> rowIterator == null ? EncodingStats.NO_STATS : rowIterator.stats(); private final Keyspace keyspace; private final ReadCommand command; @@ -86,106 +94,42 @@ class ReplicaFilteringProtection private final InetAddress[] sources; private final TableMetrics tableMetrics; - /** - * Per-source primary keys of the rows that might be outdated so they need to be fetched. - * For outdated static rows we use an empty builder to signal it has to be queried. - */ - private final List<SortedMap<DecoratedKey, BTreeSet.Builder<Clustering>>> rowsToFetch; + private final int cachedRowsWarnThreshold; + private final int cachedRowsFailThreshold; + + /** Tracks whether or not we've already hit the warning threshold while evaluating a partition. */ + private boolean hitWarningThreshold = false; + + private int currentRowsCached = 0; // tracks the current number of cached rows + private int maxRowsCached = 0; // tracks the high watermark for the number of cached rows /** - * Per-source list of all the partitions seen by the merge listener, to be merged with the extra fetched rows. + * Per-source list of the pending partitions seen by the merge listener, to be merged with the extra fetched rows. */ - private final List<List<PartitionBuilder>> originalPartitions; + private final List<Queue<PartitionBuilder>> originalPartitions; ReplicaFilteringProtection(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, - InetAddress[] sources) + InetAddress[] sources, + int cachedRowsWarnThreshold, + int cachedRowsFailThreshold) { this.keyspace = keyspace; this.command = command; this.consistency = consistency; this.sources = sources; - this.rowsToFetch = new ArrayList<>(sources.length); this.originalPartitions = new ArrayList<>(sources.length); - for (InetAddress ignored : sources) + for (int i = 0; i < sources.length; i++) { - rowsToFetch.add(new TreeMap<>()); - originalPartitions.add(new ArrayList<>()); + originalPartitions.add(new ArrayDeque<>()); } tableMetrics = ColumnFamilyStore.metricsFor(command.metadata().cfId); - } - private BTreeSet.Builder<Clustering> getOrCreateToFetch(int source, DecoratedKey partitionKey) - { - return rowsToFetch.get(source).computeIfAbsent(partitionKey, k -> BTreeSet.builder(command.metadata().comparator)); - } - - /** - * Returns the protected results for the specified replica. These are generated fetching the extra rows and merging - * them with the cached original filtered results for that replica. - * - * @param source the source - * @return the protected results for the specified replica - */ - UnfilteredPartitionIterator queryProtectedPartitions(int source) - { - UnfilteredPartitionIterator original = makeIterator(originalPartitions.get(source)); - SortedMap<DecoratedKey, BTreeSet.Builder<Clustering>> toFetch = rowsToFetch.get(source); - - if (toFetch.isEmpty()) - return original; - - // TODO: this would be more efficient if we had multi-key queries internally - List<UnfilteredPartitionIterator> fetched = toFetch.keySet() - .stream() - .map(k -> querySourceOnKey(source, k)) - .collect(Collectors.toList()); - - return UnfilteredPartitionIterators.merge(Arrays.asList(original, UnfilteredPartitionIterators.concat(fetched)), - command.nowInSec(), null); - } - - private UnfilteredPartitionIterator querySourceOnKey(int i, DecoratedKey key) - { - BTreeSet.Builder<Clustering> builder = rowsToFetch.get(i).get(key); - assert builder != null; // We're calling this on the result of rowsToFetch.get(i).keySet() - - InetAddress source = sources[i]; - NavigableSet<Clustering> clusterings = builder.build(); - tableMetrics.replicaSideFilteringProtectionRequests.mark(); - if (logger.isTraceEnabled()) - logger.trace("Requesting rows {} in partition {} from {} for replica-side filtering protection", - clusterings, key, source); - Tracing.trace("Requesting {} rows in partition {} from {} for replica-side filtering protection", - clusterings.size(), key, source); - - // build the read command taking into account that we could be requesting only in the static row - DataLimits limits = clusterings.isEmpty() ? DataLimits.cqlLimits(1) : DataLimits.NONE; - ClusteringIndexFilter filter = new ClusteringIndexNamesFilter(clusterings, command.isReversed()); - SinglePartitionReadCommand cmd = SinglePartitionReadCommand.create(command.metadata(), - command.nowInSec(), - command.columnFilter(), - RowFilter.NONE, - limits, - key, - filter); - try - { - return executeReadCommand(cmd, source); - } - catch (ReadTimeoutException e) - { - int blockFor = consistency.blockFor(keyspace); - throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true); - } - catch (UnavailableException e) - { - int blockFor = consistency.blockFor(keyspace); - throw new UnavailableException(consistency, blockFor, blockFor - 1); - } + this.cachedRowsWarnThreshold = cachedRowsWarnThreshold; + this.cachedRowsFailThreshold = cachedRowsFailThreshold; } private UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, InetAddress source) @@ -212,83 +156,124 @@ class ReplicaFilteringProtection * <p> * The listener will track both the accepted data and the primary keys of the rows that are considered as outdated. * That way, once the query results would have been merged using this listener, further calls to - * {@link #queryProtectedPartitions(int)} will use the collected data to return a copy of the + * {@link #queryProtectedPartitions(PartitionIterator, int)} will use the collected data to return a copy of the * data originally collected from the specified replica, completed with the potentially outdated rows. */ UnfilteredPartitionIterators.MergeListener mergeController() { - return (partitionKey, versions) -> { - - PartitionBuilder[] builders = new PartitionBuilder[sources.length]; - - for (int i = 0; i < sources.length; i++) - builders[i] = new PartitionBuilder(partitionKey, columns(versions), stats(versions)); + return new UnfilteredPartitionIterators.MergeListener() + { + @Override + public void close() + { + // If we hit the failure threshold before consuming a single partition, record the current rows cached. + tableMetrics.rfpRowsCachedPerQuery.update(Math.max(currentRowsCached, maxRowsCached)); + } - return new UnfilteredRowIterators.MergeListener() + @Override + public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions) { - @Override - public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions) + PartitionBuilder[] builders = new PartitionBuilder[sources.length]; + PartitionColumns columns = columns(versions); + EncodingStats stats = EncodingStats.merge(versions, NULL_TO_NO_STATS); + + for (int i = 0; i < sources.length; i++) + builders[i] = new PartitionBuilder(partitionKey, sources[i], columns, stats); + + return new UnfilteredRowIterators.MergeListener() { - // cache the deletion time versions to be able to regenerate the original row iterator - for (int i = 0; i < versions.length; i++) - builders[i].setDeletionTime(versions[i]); - } + @Override + public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions) + { + // cache the deletion time versions to be able to regenerate the original row iterator + for (int i = 0; i < versions.length; i++) + builders[i].setDeletionTime(versions[i]); + } - @Override - public Row onMergedRows(Row merged, Row[] versions) - { - // cache the row versions to be able to regenerate the original row iterator - for (int i = 0; i < versions.length; i++) - builders[i].addRow(versions[i]); + @Override + public Row onMergedRows(Row merged, Row[] versions) + { + // cache the row versions to be able to regenerate the original row iterator + for (int i = 0; i < versions.length; i++) + builders[i].addRow(versions[i]); - if (merged.isEmpty()) - return merged; + if (merged.isEmpty()) + return merged; - boolean isPotentiallyOutdated = false; - boolean isStatic = merged.isStatic(); - for (int i = 0; i < versions.length; i++) - { - Row version = versions[i]; - if (version == null || (isStatic && version.isEmpty())) + boolean isPotentiallyOutdated = false; + boolean isStatic = merged.isStatic(); + for (int i = 0; i < versions.length; i++) { - isPotentiallyOutdated = true; - BTreeSet.Builder<Clustering> toFetch = getOrCreateToFetch(i, partitionKey); - // Note that for static, we shouldn't add the clustering to the clustering set (the - // ClusteringIndexNamesFilter we'll build from this later does not expect it), but the fact - // we created a builder in the first place will act as a marker that the static row must be - // fetched, even if no other rows are added for this partition. - if (!isStatic) - toFetch.add(merged.clustering()); + Row version = versions[i]; + if (version == null || (isStatic && version.isEmpty())) + { + isPotentiallyOutdated = true; + builders[i].addToFetch(merged); + } } - } - // If the row is potentially outdated (because some replica didn't send anything and so it _may_ be - // an outdated result that is only present because other replica have filtered the up-to-date result - // out), then we skip the row. In other words, the results of the initial merging of results by this - // protection assume the worst case scenario where every row that might be outdated actually is. - // This ensures that during this first phase (collecting additional row to fetch) we are guaranteed - // to look at enough data to ultimately fulfill the query limit. - return isPotentiallyOutdated ? null : merged; - } + // If the row is potentially outdated (because some replica didn't send anything and so it _may_ be + // an outdated result that is only present because other replica have filtered the up-to-date result + // out), then we skip the row. In other words, the results of the initial merging of results by this + // protection assume the worst case scenario where every row that might be outdated actually is. + // This ensures that during this first phase (collecting additional row to fetch) we are guaranteed + // to look at enough data to ultimately fulfill the query limit. + return isPotentiallyOutdated ? null : merged; + } - @Override - public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions) - { - // cache the marker versions to be able to regenerate the original row iterator - for (int i = 0; i < versions.length; i++) - builders[i].addRangeTombstoneMarker(versions[i]); - } + @Override + public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions) + { + // cache the marker versions to be able to regenerate the original row iterator + for (int i = 0; i < versions.length; i++) + builders[i].addRangeTombstoneMarker(versions[i]); + } - @Override - public void close() - { - for (int i = 0; i < sources.length; i++) - originalPartitions.get(i).add(builders[i]); - } - }; + @Override + public void close() + { + for (int i = 0; i < sources.length; i++) + originalPartitions.get(i).add(builders[i]); + } + }; + } }; } + private void incrementCachedRows() + { + currentRowsCached++; + + if (currentRowsCached == cachedRowsFailThreshold + 1) + { + String message = String.format("Replica filtering protection has cached over %d rows during query %s. " + + "(See 'cached_replica_rows_fail_threshold' in cassandra.yaml.)", + cachedRowsFailThreshold, command.toCQLString()); + + logger.error(message); + Tracing.trace(message); + throw new OverloadedException(message); + } + else if (currentRowsCached == cachedRowsWarnThreshold + 1 && !hitWarningThreshold) + { + hitWarningThreshold = true; + + String message = String.format("Replica filtering protection has cached over %d rows during query %s. " + + "(See 'cached_replica_rows_warn_threshold' in cassandra.yaml.)", + cachedRowsWarnThreshold, command.toCQLString()); + + ClientWarn.instance.warn(message); + oneMinuteLogger.warn(message); + Tracing.trace(message); + } + } + + private void releaseCachedRows(int count) + { + maxRowsCached = Math.max(maxRowsCached, currentRowsCached); + currentRowsCached -= count; + } + private static PartitionColumns columns(List<UnfilteredRowIterator> versions) { Columns statics = Columns.NONE; @@ -305,24 +290,19 @@ class ReplicaFilteringProtection return new PartitionColumns(statics, regulars); } - private static EncodingStats stats(List<UnfilteredRowIterator> iterators) - { - EncodingStats stats = EncodingStats.NO_STATS; - for (UnfilteredRowIterator iter : iterators) - { - if (iter == null) - continue; - - stats = stats.mergeWith(iter.stats()); - } - return stats; - } - - private UnfilteredPartitionIterator makeIterator(List<PartitionBuilder> builders) + /** + * Returns the protected results for the specified replica. These are generated fetching the extra rows and merging + * them with the cached original filtered results for that replica. + * + * @param merged the first iteration partitions, that should have been read used with the {@link #mergeController()} + * @param source the source + * @return the protected results for the specified replica + */ + UnfilteredPartitionIterator queryProtectedPartitions(PartitionIterator merged, int source) { return new UnfilteredPartitionIterator() { - final Iterator<PartitionBuilder> iterator = builders.iterator(); + final Queue<PartitionBuilder> partitions = originalPartitions.get(source); @Override public boolean isForThrift() @@ -337,37 +317,48 @@ class ReplicaFilteringProtection } @Override - public void close() - { - // nothing to do here - } + public void close() { } @Override public boolean hasNext() { - return iterator.hasNext(); + // If there are no cached partition builders for this source, advance the first phase iterator, which + // will force the RFP merge listener to load at least the next protected partition. Note that this may + // load more than one partition if any divergence between replicas is discovered by the merge listener. + if (partitions.isEmpty()) + { + PartitionIterators.consumeNext(merged); + } + + return !partitions.isEmpty(); } @Override public UnfilteredRowIterator next() { - return iterator.next().build(); + PartitionBuilder builder = partitions.poll(); + assert builder != null; + return builder.protectedPartition(); } }; } private class PartitionBuilder { - private final DecoratedKey partitionKey; + private final DecoratedKey key; + private final InetAddress source; private final PartitionColumns columns; private final EncodingStats stats; private DeletionTime deletionTime; private Row staticRow = Rows.EMPTY_STATIC_ROW; - private final List<Unfiltered> contents = new ArrayList<>(); + private final Queue<Unfiltered> contents = new ArrayDeque<>(); + private BTreeSet.Builder<Clustering> toFetch; + private int partitionRowsCached; - private PartitionBuilder(DecoratedKey partitionKey, PartitionColumns columns, EncodingStats stats) + private PartitionBuilder(DecoratedKey key, InetAddress source, PartitionColumns columns, EncodingStats stats) { - this.partitionKey = partitionKey; + this.key = key; + this.source = source; this.columns = columns; this.stats = stats; } @@ -379,6 +370,12 @@ class ReplicaFilteringProtection private void addRow(Row row) { + partitionRowsCached++; + + incrementCachedRows(); + + // Note that even null rows are counted against the row caching limit. The assumption is that + // a subsequent protection query will later fetch the row onto the heap anyway. if (row == null) return; @@ -394,12 +391,23 @@ class ReplicaFilteringProtection contents.add(marker); } - private UnfilteredRowIterator build() + private void addToFetch(Row row) + { + if (toFetch == null) + toFetch = BTreeSet.builder(command.metadata().comparator); + + // Note that for static, we shouldn't add the clustering to the clustering set (the + // ClusteringIndexNamesFilter we'll build from this later does not expect it), but the fact + // we created a builder in the first place will act as a marker that the static row must be + // fetched, even if no other rows are added for this partition. + if (!row.isStatic()) + toFetch.add(row.clustering()); + } + + private UnfilteredRowIterator originalPartition() { return new UnfilteredRowIterator() { - final Iterator<Unfiltered> iterator = contents.iterator(); - @Override public DeletionTime partitionLevelDeletion() { @@ -433,7 +441,7 @@ class ReplicaFilteringProtection @Override public DecoratedKey partitionKey() { - return partitionKey; + return key; } @Override @@ -445,21 +453,82 @@ class ReplicaFilteringProtection @Override public void close() { - // nothing to do here + releaseCachedRows(partitionRowsCached); } @Override public boolean hasNext() { - return iterator.hasNext(); + return !contents.isEmpty(); } @Override public Unfiltered next() { - return iterator.next(); + return contents.poll(); } }; } + + private UnfilteredRowIterator protectedPartition() + { + UnfilteredRowIterator original = originalPartition(); + + if (toFetch != null) + { + try (UnfilteredPartitionIterator partitions = fetchFromSource()) + { + if (partitions.hasNext()) + { + try (UnfilteredRowIterator fetchedRows = partitions.next()) + { + return UnfilteredRowIterators.merge(Arrays.asList(original, fetchedRows), command.nowInSec()); + } + } + } + } + + return original; + } + + private UnfilteredPartitionIterator fetchFromSource() + { + assert toFetch != null; + + NavigableSet<Clustering> clusterings = toFetch.build(); + tableMetrics.replicaFilteringProtectionRequests.mark(); + + if (logger.isTraceEnabled()) + logger.trace("Requesting rows {} in partition {} from {} for replica filtering protection", + clusterings, key, source); + + Tracing.trace("Requesting {} rows in partition {} from {} for replica filtering protection", + clusterings.size(), key, source); + + // build the read command taking into account that we could be requesting only in the static row + DataLimits limits = clusterings.isEmpty() ? DataLimits.cqlLimits(1) : DataLimits.NONE; + ClusteringIndexFilter filter = new ClusteringIndexNamesFilter(clusterings, command.isReversed()); + SinglePartitionReadCommand cmd = SinglePartitionReadCommand.create(command.metadata(), + command.nowInSec(), + command.columnFilter(), + RowFilter.NONE, + limits, + key, + filter); + try + { + return executeReadCommand(cmd, source); + } + catch (ReadTimeoutException e) + { + int blockFor = consistency.blockFor(keyspace); + throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true); + } + catch (UnavailableException e) + { + int blockFor = consistency.blockFor(keyspace); + throw new UnavailableException(consistency, blockFor, blockFor - 1); + } + } } } \ No newline at end of file diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index d287788..f0b183d 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -4799,6 +4799,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public void setTombstoneWarnThreshold(int threshold) { DatabaseDescriptor.setTombstoneWarnThreshold(threshold); + logger.info("updated tombstone_warn_threshold to {}", threshold); } public int getTombstoneFailureThreshold() @@ -4809,6 +4810,29 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public void setTombstoneFailureThreshold(int threshold) { DatabaseDescriptor.setTombstoneFailureThreshold(threshold); + logger.info("updated tombstone_failure_threshold to {}", threshold); + } + + public int getCachedReplicaRowsWarnThreshold() + { + return DatabaseDescriptor.getCachedReplicaRowsWarnThreshold(); + } + + public void setCachedReplicaRowsWarnThreshold(int threshold) + { + DatabaseDescriptor.setCachedReplicaRowsWarnThreshold(threshold); + logger.info("updated replica_filtering_protection.cached_rows_warn_threshold to {}", threshold); + } + + public int getCachedReplicaRowsFailThreshold() + { + return DatabaseDescriptor.getCachedReplicaRowsFailThreshold(); + } + + public void setCachedReplicaRowsFailThreshold(int threshold) + { + DatabaseDescriptor.setCachedReplicaRowsFailThreshold(threshold); + logger.info("updated replica_filtering_protection.cached_rows_fail_threshold to {}", threshold); } public int getBatchSizeFailureThreshold() @@ -4819,12 +4843,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public void setBatchSizeFailureThreshold(int threshold) { DatabaseDescriptor.setBatchSizeFailThresholdInKB(threshold); + logger.info("updated batch_size_fail_threshold_in_kb to {}", threshold); } public void setHintedHandoffThrottleInKB(int throttleInKB) { DatabaseDescriptor.setHintedHandoffThrottleInKB(throttleInKB); - logger.info(String.format("Updated hinted_handoff_throttle_in_kb to %d", throttleInKB)); + logger.info("updated hinted_handoff_throttle_in_kb to {}", throttleInKB); } @VisibleForTesting diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index e22b094..1afa48e 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -604,6 +604,18 @@ public interface StorageServiceMBean extends NotificationEmitter /** Sets the threshold for abandoning queries with many tombstones */ public void setTombstoneFailureThreshold(int tombstoneDebugThreshold); + /** Returns the number of rows cached at the coordinator before filtering/index queries log a warning. */ + public int getCachedReplicaRowsWarnThreshold(); + + /** Sets the number of rows cached at the coordinator before filtering/index queries log a warning. */ + public void setCachedReplicaRowsWarnThreshold(int threshold); + + /** Returns the number of rows cached at the coordinator before filtering/index queries fail outright. */ + public int getCachedReplicaRowsFailThreshold(); + + /** Sets the number of rows cached at the coordinator before filtering/index queries fail outright. */ + public void setCachedReplicaRowsFailThreshold(int threshold); + /** Returns the threshold for rejecting queries due to a large batch size */ public int getBatchSizeFailureThreshold(); /** Sets the threshold for rejecting queries due to a large batch size */ diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java index b560adf..d3633fd 100644 --- a/src/java/org/apache/cassandra/utils/FBUtilities.java +++ b/src/java/org/apache/cassandra/utils/FBUtilities.java @@ -437,9 +437,9 @@ public class FBUtilities } } - public static void waitOnFutures(List<AsyncOneResponse> results, long ms) throws TimeoutException + public static void waitOnFutures(List<AsyncOneResponse<?>> results, long ms) throws TimeoutException { - for (AsyncOneResponse result : results) + for (AsyncOneResponse<?> result : results) result.get(ms, TimeUnit.MILLISECONDS); } diff --git a/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java b/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java index ca9bb09..15afdbe 100644 --- a/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java +++ b/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java @@ -18,7 +18,6 @@ */ package org.apache.cassandra.utils.concurrent; -import java.util.Arrays; import java.util.Iterator; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -138,14 +137,12 @@ public class Accumulator<E> implements Iterable<E> } /** - * Removes all of the elements from this accumulator. + * Removes element at the speficied index from this accumulator. * * This method is not thread-safe when used concurrently with {@link #add(Object)}. */ - public void clearUnsafe() + public void clearUnsafe(int i) { - nextIndexUpdater.set(this, 0); - presentCountUpdater.set(this, 0); - Arrays.fill(values, null); + values[i] = null; } } diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java index 2f2b525..51a08e6 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java @@ -39,6 +39,7 @@ import org.apache.cassandra.distributed.api.QueryResult; import org.apache.cassandra.distributed.api.QueryResults; import org.apache.cassandra.distributed.api.SimpleQueryResult; import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.ClientWarn; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.service.pager.QueryPager; import org.apache.cassandra.transport.Server; @@ -91,6 +92,11 @@ public class Coordinator implements ICoordinator boundBBValues.add(ByteBufferUtil.objectToBytes(boundValue)); prepared.validate(QueryState.forInternalCalls().getClientState()); + + // Start capturing warnings on this thread. Note that this will implicitly clear out any previous + // warnings as it sets a new State instance on the ThreadLocal. + ClientWarn.instance.captureWarnings(); + ResultMessage res = prepared.execute(QueryState.forInternalCalls(), QueryOptions.create(toCassandraCL(consistencyLevel), boundBBValues, @@ -100,6 +106,10 @@ public class Coordinator implements ICoordinator null, Server.CURRENT_VERSION)); + // Collect warnings reported during the query. + if (res != null) + res.setWarnings(ClientWarn.instance.getWarnings()); + return RowUtil.toQueryResult(res); } diff --git a/test/distributed/org/apache/cassandra/distributed/impl/RowUtil.java b/test/distributed/org/apache/cassandra/distributed/impl/RowUtil.java index 50d501e..876ce29 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/RowUtil.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/RowUtil.java @@ -19,6 +19,7 @@ package org.apache.cassandra.distributed.impl; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -41,7 +42,11 @@ public class RowUtil ResultMessage.Rows rows = (ResultMessage.Rows) res; String[] names = getColumnNames(rows.result.metadata.names); Object[][] results = RowUtil.toObjects(rows); - return new SimpleQueryResult(names, results); + + // Warnings may be null here, due to ClientWarn#getWarnings() handling of empty warning lists. + List<String> warnings = res.getWarnings(); + + return new SimpleQueryResult(names, results, warnings == null ? Collections.emptyList() : warnings); } else { diff --git a/test/distributed/org/apache/cassandra/distributed/test/ReplicaFilteringProtectionTest.java b/test/distributed/org/apache/cassandra/distributed/test/ReplicaFilteringProtectionTest.java new file mode 100644 index 0000000..2a847bf --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/ReplicaFilteringProtectionTest.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.io.IOException; +import java.util.List; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.SimpleQueryResult; +import org.apache.cassandra.exceptions.OverloadedException; +import org.apache.cassandra.service.StorageService; + +import static org.apache.cassandra.config.ReplicaFilteringProtectionOptions.DEFAULT_FAIL_THRESHOLD; +import static org.apache.cassandra.config.ReplicaFilteringProtectionOptions.DEFAULT_WARN_THRESHOLD; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL; +import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows; +import static org.apache.cassandra.distributed.shared.AssertUtils.row; +import static org.junit.Assert.assertEquals; + +/** + * Exercises the functionality of {@link org.apache.cassandra.service.ReplicaFilteringProtection}, the + * mechanism that ensures distributed index and filtering queries at read consistency levels > ONE/LOCAL_ONE + * avoid stale replica results. + */ +public class ReplicaFilteringProtectionTest extends TestBaseImpl +{ + private static final int REPLICAS = 2; + private static final int ROWS = 3; + + private static Cluster cluster; + + @BeforeClass + public static void setup() throws IOException + { + cluster = init(Cluster.build() + .withNodes(REPLICAS) + .withConfig(config -> config.set("hinted_handoff_enabled", false) + .set("commitlog_sync", "batch") + .set("num_tokens", 1)).start()); + + // Make sure we start w/ the correct defaults: + cluster.get(1).runOnInstance(() -> assertEquals(DEFAULT_WARN_THRESHOLD, StorageService.instance.getCachedReplicaRowsWarnThreshold())); + cluster.get(1).runOnInstance(() -> assertEquals(DEFAULT_FAIL_THRESHOLD, StorageService.instance.getCachedReplicaRowsFailThreshold())); + } + + @AfterClass + public static void teardown() + { + cluster.close(); + } + + @Test + public void testMissedUpdatesBelowCachingWarnThreshold() + { + String tableName = "missed_updates_no_warning"; + cluster.schemaChange(withKeyspace("CREATE TABLE %s." + tableName + " (k int PRIMARY KEY, v text)")); + + // The warning threshold provided is one more than the total number of rows returned + // to the coordinator from all replicas and therefore should not be triggered. + testMissedUpdates(tableName, REPLICAS * ROWS, Integer.MAX_VALUE, false); + } + + @Test + public void testMissedUpdatesAboveCachingWarnThreshold() + { + String tableName = "missed_updates_cache_warn"; + cluster.schemaChange(withKeyspace("CREATE TABLE %s." + tableName + " (k int PRIMARY KEY, v text)")); + + // The warning threshold provided is one less than the total number of rows returned + // to the coordinator from all replicas and therefore should be triggered but not fail the query. + testMissedUpdates(tableName, REPLICAS * ROWS - 1, Integer.MAX_VALUE, true); + } + + @Test + public void testMissedUpdatesAroundCachingFailThreshold() + { + String tableName = "missed_updates_cache_fail"; + cluster.schemaChange(withKeyspace("CREATE TABLE %s." + tableName + " (k int PRIMARY KEY, v text)")); + + // The failure threshold provided is exactly the total number of rows returned + // to the coordinator from all replicas and therefore should just warn. + testMissedUpdates(tableName, 1, REPLICAS * ROWS, true); + + try + { + // The failure threshold provided is one less than the total number of rows returned + // to the coordinator from all replicas and therefore should fail the query. + testMissedUpdates(tableName, 1, REPLICAS * ROWS - 1, true); + } + catch (RuntimeException e) + { + assertEquals(e.getCause().getClass().getName(), OverloadedException.class.getName()); + } + } + + private void testMissedUpdates(String tableName, int warnThreshold, int failThreshold, boolean shouldWarn) + { + cluster.get(1).runOnInstance(() -> StorageService.instance.setCachedReplicaRowsWarnThreshold(warnThreshold)); + cluster.get(1).runOnInstance(() -> StorageService.instance.setCachedReplicaRowsFailThreshold(failThreshold)); + + String fullTableName = KEYSPACE + '.' + tableName; + + // Case 1: Insert and query rows at ALL to verify base line. + for (int i = 0; i < ROWS; i++) + { + cluster.coordinator(1).execute("INSERT INTO " + fullTableName + "(k, v) VALUES (?, 'old')", ALL, i); + } + + long histogramSampleCount = rowsCachedPerQueryCount(cluster.get(1), tableName); + + String query = "SELECT * FROM " + fullTableName + " WHERE v = ? LIMIT ? ALLOW FILTERING"; + + Object[][] initialRows = cluster.coordinator(1).execute(query, ALL, "old", ROWS); + assertRows(initialRows, row(1, "old"), row(0, "old"), row(2, "old")); + + // Make sure only one sample was recorded for the query. + assertEquals(histogramSampleCount + 1, rowsCachedPerQueryCount(cluster.get(1), tableName)); + + // Case 2: Update all rows on only one replica, leaving the entire dataset of the remaining replica out-of-date. + updateAllRowsOn(1, fullTableName, "new"); + + // The replica that missed the results creates a mismatch at every row, and we therefore cache a version + // of that row for all replicas. + SimpleQueryResult oldResult = cluster.coordinator(1).executeWithResult(query, ALL, "old", ROWS); + assertRows(oldResult.toObjectArrays()); + verifyWarningState(shouldWarn, oldResult); + + // We should have made 3 row "completion" requests. + assertEquals(ROWS, protectionQueryCount(cluster.get(1), tableName)); + + // In all cases above, the queries should be caching 1 row per partition per replica, but + // 6 for the whole query, given every row is potentially stale. + assertEquals(ROWS * REPLICAS, maxRowsCachedPerQuery(cluster.get(1), tableName)); + + // Make sure only one more sample was recorded for the query. + assertEquals(histogramSampleCount + 2, rowsCachedPerQueryCount(cluster.get(1), tableName)); + + // Case 3: Observe the effects of blocking read-repair. + + // The previous query peforms a blocking read-repair, which removes replica divergence. This + // will only warn, therefore, if the warning threshold is actually below the number of replicas. + // (i.e. The row cache counter is decremented/reset as each partition is consumed.) + SimpleQueryResult newResult = cluster.coordinator(1).executeWithResult(query, ALL, "new", ROWS); + Object[][] newRows = newResult.toObjectArrays(); + assertRows(newRows, row(1, "new"), row(0, "new"), row(2, "new")); + + verifyWarningState(warnThreshold < REPLICAS, newResult); + + // We still sould only have made 3 row "completion" requests, with no replica divergence in the last query. + assertEquals(ROWS, protectionQueryCount(cluster.get(1), tableName)); + + // With no replica divergence, we only cache a single partition at a time across 2 replicas. + assertEquals(REPLICAS, minRowsCachedPerQuery(cluster.get(1), tableName)); + + // Make sure only one more sample was recorded for the query. + assertEquals(histogramSampleCount + 3, rowsCachedPerQueryCount(cluster.get(1), tableName)); + + // Case 4: Introduce another mismatch by updating all rows on only one replica. + + updateAllRowsOn(1, fullTableName, "future"); + + // Another mismatch is introduced, and we once again cache a version of each row during resolution. + SimpleQueryResult futureResult = cluster.coordinator(1).executeWithResult(query, ALL, "future", ROWS); + Object[][] futureRows = futureResult.toObjectArrays(); + assertRows(futureRows, row(1, "future"), row(0, "future"), row(2, "future")); + + verifyWarningState(shouldWarn, futureResult); + + // We sould have made 3 more row "completion" requests. + assertEquals(ROWS * 2, protectionQueryCount(cluster.get(1), tableName)); + + // In all cases above, the queries should be caching 1 row per partition, but 6 for the + // whole query, given every row is potentially stale. + assertEquals(ROWS * REPLICAS, maxRowsCachedPerQuery(cluster.get(1), tableName)); + + // Make sure only one more sample was recorded for the query. + assertEquals(histogramSampleCount + 4, rowsCachedPerQueryCount(cluster.get(1), tableName)); + } + + private void updateAllRowsOn(int node, String table, String value) + { + for (int i = 0; i < ROWS; i++) + { + cluster.get(node).executeInternal("UPDATE " + table + " SET v = ? WHERE k = ?", value, i); + } + } + + private void verifyWarningState(boolean shouldWarn, SimpleQueryResult futureResult) + { + List<String> futureWarnings = futureResult.warnings(); + assertEquals(shouldWarn, futureWarnings.stream().anyMatch(w -> w.contains("cached_replica_rows_warn_threshold"))); + assertEquals(shouldWarn ? 1 : 0, futureWarnings.size()); + } + + private long protectionQueryCount(IInvokableInstance instance, String tableName) + { + return instance.callOnInstance(() -> Keyspace.open(KEYSPACE) + .getColumnFamilyStore(tableName) + .metric.replicaFilteringProtectionRequests.getCount()); + } + + private long maxRowsCachedPerQuery(IInvokableInstance instance, String tableName) + { + return instance.callOnInstance(() -> Keyspace.open(KEYSPACE) + .getColumnFamilyStore(tableName) + .metric.rfpRowsCachedPerQuery.getSnapshot().getMax()); + } + + private long minRowsCachedPerQuery(IInvokableInstance instance, String tableName) + { + return instance.callOnInstance(() -> Keyspace.open(KEYSPACE) + .getColumnFamilyStore(tableName) + .metric.rfpRowsCachedPerQuery.getSnapshot().getMin()); + } + + private long rowsCachedPerQueryCount(IInvokableInstance instance, String tableName) + { + return instance.callOnInstance(() -> Keyspace.open(KEYSPACE) + .getColumnFamilyStore(tableName) + .metric.rfpRowsCachedPerQuery.getCount()); + } +} diff --git a/test/unit/org/apache/cassandra/db/rows/EncodingStatsTest.java b/test/unit/org/apache/cassandra/db/rows/EncodingStatsTest.java new file mode 100644 index 0000000..559ce10 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/rows/EncodingStatsTest.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.rows; + +import java.util.function.Function; + +import com.google.common.collect.ImmutableList; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.db.LivenessInfo; + +public class EncodingStatsTest +{ + @Test + public void testCollectWithNoStats() + { + EncodingStats none = EncodingStats.merge(ImmutableList.of( + EncodingStats.NO_STATS, + EncodingStats.NO_STATS, + EncodingStats.NO_STATS + ), Function.identity()); + Assert.assertEquals(none, EncodingStats.NO_STATS); + } + + @Test + public void testCollectWithNoStatsWithEmpty() + { + EncodingStats none = EncodingStats.merge(ImmutableList.of( + EncodingStats.NO_STATS, + EncodingStats.NO_STATS, + new EncodingStats(LivenessInfo.NO_TIMESTAMP, LivenessInfo.NO_EXPIRATION_TIME, 0) + ), Function.identity()); + Assert.assertEquals(none, EncodingStats.NO_STATS); + } + + @Test + public void testCollectWithNoStatsWithTimestamp() + { + EncodingStats single = new EncodingStats(1, LivenessInfo.NO_EXPIRATION_TIME, 0); + EncodingStats result = EncodingStats.merge(ImmutableList.of( + EncodingStats.NO_STATS, + EncodingStats.NO_STATS, + single, + EncodingStats.NO_STATS + ), Function.identity()); + Assert.assertEquals(single, result); + } + + @Test + public void testCollectWithNoStatsWithExpires() + { + EncodingStats single = new EncodingStats(LivenessInfo.NO_TIMESTAMP, 1, 0); + EncodingStats result = EncodingStats.merge(ImmutableList.of( + EncodingStats.NO_STATS, + single, + EncodingStats.NO_STATS + ), Function.identity()); + Assert.assertEquals(single, result); + } + + @Test + public void testCollectWithNoStatsWithTTL() + { + EncodingStats single = new EncodingStats(LivenessInfo.NO_TIMESTAMP, LivenessInfo.NO_EXPIRATION_TIME, 1); + EncodingStats result = EncodingStats.merge(ImmutableList.of( + EncodingStats.NO_STATS, + single, + EncodingStats.NO_STATS + ), Function.identity()); + Assert.assertEquals(single, result); + } + + @Test + public void testCollectOneEach() + { + EncodingStats tsp = new EncodingStats(1, LivenessInfo.NO_EXPIRATION_TIME, 0); + EncodingStats exp = new EncodingStats(LivenessInfo.NO_TIMESTAMP, 1, 0); + EncodingStats ttl = new EncodingStats(LivenessInfo.NO_TIMESTAMP, LivenessInfo.NO_EXPIRATION_TIME, 1); + EncodingStats result = EncodingStats.merge(ImmutableList.of( + tsp, + exp, + ttl + ), Function.identity()); + Assert.assertEquals(new EncodingStats(1, 1, 1), result); + } + + @Test + public void testTimestamp() + { + EncodingStats one = new EncodingStats(1, LivenessInfo.NO_EXPIRATION_TIME, 0); + EncodingStats two = new EncodingStats(2, LivenessInfo.NO_EXPIRATION_TIME, 0); + EncodingStats thr = new EncodingStats(3, LivenessInfo.NO_EXPIRATION_TIME, 0); + EncodingStats result = EncodingStats.merge(ImmutableList.of( + one, + two, + thr + ), Function.identity()); + Assert.assertEquals(one, result); + } + + @Test + public void testExpires() + { + EncodingStats one = new EncodingStats(LivenessInfo.NO_TIMESTAMP,1, 0); + EncodingStats two = new EncodingStats(LivenessInfo.NO_TIMESTAMP,2, 0); + EncodingStats thr = new EncodingStats(LivenessInfo.NO_TIMESTAMP,3, 0); + EncodingStats result = EncodingStats.merge(ImmutableList.of( + one, + two, + thr + ), Function.identity()); + Assert.assertEquals(one, result); + } + + @Test + public void testTTL() + { + EncodingStats one = new EncodingStats(LivenessInfo.NO_TIMESTAMP, LivenessInfo.NO_EXPIRATION_TIME,1); + EncodingStats two = new EncodingStats(LivenessInfo.NO_TIMESTAMP, LivenessInfo.NO_EXPIRATION_TIME,2); + EncodingStats thr = new EncodingStats(LivenessInfo.NO_TIMESTAMP, LivenessInfo.NO_EXPIRATION_TIME,3); + EncodingStats result = EncodingStats.merge(ImmutableList.of( + thr, + one, + two + ), Function.identity()); + Assert.assertEquals(one, result); + } +} diff --git a/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java b/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java index 33daca7..ce0fe27 100644 --- a/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java +++ b/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java @@ -29,7 +29,7 @@ public class AccumulatorTest @Test public void testAddMoreThanCapacity() { - Accumulator<Integer> accu = new Accumulator(4); + Accumulator<Integer> accu = new Accumulator<>(4); accu.add(1); accu.add(2); @@ -50,7 +50,7 @@ public class AccumulatorTest @Test public void testIsEmptyAndSize() { - Accumulator<Integer> accu = new Accumulator(4); + Accumulator<Integer> accu = new Accumulator<>(4); assertTrue(accu.isEmpty()); assertEquals(0, accu.size()); @@ -58,20 +58,20 @@ public class AccumulatorTest accu.add(1); accu.add(2); - assertTrue(!accu.isEmpty()); + assertFalse(accu.isEmpty()); assertEquals(2, accu.size()); accu.add(3); accu.add(4); - assertTrue(!accu.isEmpty()); + assertFalse(accu.isEmpty()); assertEquals(4, accu.size()); } @Test public void testGetAndIterator() { - Accumulator<String> accu = new Accumulator(4); + Accumulator<String> accu = new Accumulator<>(4); accu.add("3"); accu.add("2"); @@ -99,32 +99,32 @@ public class AccumulatorTest @Test public void testClearUnsafe() { - Accumulator<String> accu = new Accumulator<>(3); + Accumulator<String> accu = new Accumulator<>(5); accu.add("1"); accu.add("2"); accu.add("3"); - accu.clearUnsafe(); + accu.clearUnsafe(1); - assertEquals(0, accu.size()); - assertFalse(accu.iterator().hasNext()); - assertOutOfBonds(accu, 0); + assertEquals(3, accu.size()); + assertTrue(accu.iterator().hasNext()); accu.add("4"); accu.add("5"); - assertEquals(2, accu.size()); + assertEquals(5, accu.size()); - assertEquals("4", accu.get(0)); - assertEquals("5", accu.get(1)); - assertOutOfBonds(accu, 2); + assertEquals("4", accu.get(3)); + assertEquals("5", accu.get(4)); + assertOutOfBonds(accu, 5); Iterator<String> iter = accu.iterator(); assertTrue(iter.hasNext()); - assertEquals("4", iter.next()); - assertEquals("5", iter.next()); - assertFalse(iter.hasNext()); + assertEquals("1", iter.next()); + assertNull(iter.next()); + assertTrue(iter.hasNext()); + assertEquals("3", iter.next()); } private static void assertOutOfBonds(Accumulator<String> accumulator, int index) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org