Repository: cassandra Updated Branches: refs/heads/trunk 4413fdbd3 -> 5d8767765
Add option to sanity check tombstones on reads/compaction Patch by marcuse; reviewed by Ariel Weisberg for CASSANDRA-14467 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5d876776 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5d876776 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5d876776 Branch: refs/heads/trunk Commit: 5d8767765090cd968c39008f76b0cd795d6e5032 Parents: 4413fdb Author: Marcus Eriksson <marc...@apache.org> Authored: Tue May 22 13:43:22 2018 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Tue Jun 5 12:47:20 2018 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + conf/cassandra.yaml | 3 + .../org/apache/cassandra/config/Config.java | 9 +- .../cassandra/config/DatabaseDescriptor.java | 10 + .../org/apache/cassandra/db/DeletionTime.java | 9 + .../cassandra/db/UnfilteredValidation.java | 113 ++++++++++ .../columniterator/AbstractSSTableIterator.java | 2 + .../db/columniterator/SSTableIterator.java | 2 + .../columniterator/SSTableReversedIterator.java | 1 + .../apache/cassandra/db/rows/AbstractCell.java | 7 + .../apache/cassandra/db/rows/AbstractRow.java | 12 + .../apache/cassandra/db/rows/ColumnData.java | 7 + .../cassandra/db/rows/ComplexColumnData.java | 10 + .../db/rows/RangeTombstoneBoundMarker.java | 5 + .../db/rows/RangeTombstoneBoundaryMarker.java | 5 + .../apache/cassandra/db/rows/Unfiltered.java | 6 + .../io/sstable/SSTableIdentityIterator.java | 6 +- .../io/sstable/format/SSTableReader.java | 6 + .../cassandra/service/StorageService.java | 12 + .../cassandra/service/StorageServiceMBean.java | 2 + test/conf/cassandra.yaml | 1 + .../config/DatabaseDescriptorRefTest.java | 1 + .../cql3/validation/operations/TTLTest.java | 19 ++ .../db/compaction/CompactionsCQLTest.java | 223 +++++++++++++++++++ .../sstable/SSTableCorruptionDetectionTest.java | 5 + 25 files changed, 475 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 351ae37..eb064be 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Add option to sanity check tombstones on reads/compactions (CASSANDRA-14467) * Add a virtual table to expose all running sstable tasks (CASSANDRA-14457) * Let nodetool import take a list of directories (CASSANDRA-14442) * Avoid unneeded memory allocations / cpu for disabled log levels (CASSANDRA-14488) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 49c6f03..7ff056d 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -1198,3 +1198,6 @@ audit_logging_options: # included_users: # excluded_users: +# validate tombstones on reads and compaction +# can be either "disabled", "warn" or "exception" +# corrupted_tombstone_strategy: disabled http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index d945368..d9250bb 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -383,7 +383,7 @@ public class Config public volatile AuditLogOptions audit_logging_options = new AuditLogOptions(); - + public CorruptedTombstoneStrategy corrupted_tombstone_strategy = CorruptedTombstoneStrategy.disabled; /** * @deprecated migrate to {@link DatabaseDescriptor#isClientInitialized()} */ @@ -468,6 +468,13 @@ public class Config reject } + public enum CorruptedTombstoneStrategy + { + disabled, + warn, + exception + } + private static final List<String> SENSITIVE_KEYS = new ArrayList<String>() {{ add("client_encryption_options"); add("server_encryption_options"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 592b96e..91ee63a 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -2596,4 +2596,14 @@ public class DatabaseDescriptor { conf.audit_logging_options = auditLoggingOptions; } + + public static Config.CorruptedTombstoneStrategy getCorruptedTombstoneStrategy() + { + return conf.corrupted_tombstone_strategy; + } + + public static void setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy strategy) + { + conf.corrupted_tombstone_strategy = strategy; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/db/DeletionTime.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java index 9dcbb07..14e846d 100644 --- a/src/java/org/apache/cassandra/db/DeletionTime.java +++ b/src/java/org/apache/cassandra/db/DeletionTime.java @@ -88,6 +88,15 @@ public class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory HashingUtils.updateWithLong(hasher, markedForDeleteAt()); } + /** + * check if this deletion time is valid - localDeletionTime can never be negative + * @return true if it is valid + */ + public boolean validate() + { + return localDeletionTime >= 0; + } + @Override public boolean equals(Object o) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/db/UnfilteredValidation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/UnfilteredValidation.java b/src/java/org/apache/cassandra/db/UnfilteredValidation.java new file mode 100644 index 0000000..6d8bbfd --- /dev/null +++ b/src/java/org/apache/cassandra/db/UnfilteredValidation.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.rows.Unfiltered; + +import org.apache.cassandra.io.sstable.CorruptSSTableException; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.serializers.MarshalException; +import org.apache.cassandra.utils.NoSpamLogger; + + +/** + * Handles unfiltered validation - if configured, it checks if the provided unfiltered has + * invalid deletions (if the local deletion time is negative or if the ttl is negative) and + * then either logs or throws an exception if so. + */ +public class UnfilteredValidation +{ + private static final Logger logger = LoggerFactory.getLogger(UnfilteredValidation.class); + private static final NoSpamLogger nospam1m = NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES); + + public static void maybeValidateUnfiltered(Unfiltered unfiltered, TableMetadata metadata, DecoratedKey key, SSTableReader sstable) + { + Config.CorruptedTombstoneStrategy strat = DatabaseDescriptor.getCorruptedTombstoneStrategy(); + if (strat != Config.CorruptedTombstoneStrategy.disabled && unfiltered != null && !unfiltered.isEmpty()) + { + boolean hasInvalidDeletions = false; + try + { + hasInvalidDeletions = unfiltered.hasInvalidDeletions(); + } + catch (Throwable t) // make sure no unknown exceptions fail the read/compaction + { + nospam1m.error("Could not check if Unfiltered in {} had any invalid deletions", sstable, t); + } + + if (hasInvalidDeletions) + { + String content; + try + { + content = unfiltered.toString(metadata, true); + } + catch (Throwable t) + { + content = "Could not get string representation: " + t.getMessage(); + } + handleInvalid(metadata, key, sstable, content); + } + } + } + + public static void handleInvalid(TableMetadata metadata, DecoratedKey key, SSTableReader sstable, String invalidContent) + { + Config.CorruptedTombstoneStrategy strat = DatabaseDescriptor.getCorruptedTombstoneStrategy(); + String keyString; + try + { + keyString = metadata.partitionKeyType.getString(key.getKey()); + } + catch (Throwable t) + { + keyString = "[corrupt token="+key.getToken()+"]"; + } + + if (strat == Config.CorruptedTombstoneStrategy.exception) + { + String msg = String.format("Key %s in %s.%s is invalid in %s: %s", + keyString, + metadata.keyspace, + metadata.name, + sstable, + invalidContent); + // we mark suspect to make sure this sstable is not included in future compactions - it would just keep + // throwing exceptions + sstable.markSuspect(); + throw new CorruptSSTableException(new MarshalException(msg), sstable.getFilename()); + } + else if (strat == Config.CorruptedTombstoneStrategy.warn) + { + String msgTemplate = String.format("Key {} in %s.%s is invalid in %s: {}", + metadata.keyspace, + metadata.name, + sstable); + nospam1m.warn(msgTemplate, keyString, invalidContent); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java index 9496878..443fe49 100644 --- a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java +++ b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java @@ -111,6 +111,8 @@ public abstract class AbstractSSTableIterator implements UnfilteredRowIterator this.staticRow = Rows.EMPTY_STATIC_ROW; this.reader = createReader(indexEntry, file, shouldCloseFile); } + if (!partitionLevelDeletion.validate()) + UnfilteredValidation.handleInvalid(metadata(), key, sstable, "partitionLevelDeletion="+partitionLevelDeletion.toString()); if (reader != null && !slices.isEmpty()) reader.setForSlice(nextSlice()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java index 6b8a3ad..9346345 100644 --- a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java +++ b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java @@ -150,6 +150,7 @@ public class SSTableIterator extends AbstractSSTableIterator return null; Unfiltered next = deserializer.readNext(); + UnfilteredValidation.maybeValidateUnfiltered(next, metadata(), key, sstable); // We may get empty row for the same reason expressed on UnfilteredSerializer.deserializeOne. if (next.isEmpty()) continue; @@ -299,6 +300,7 @@ public class SSTableIterator extends AbstractSSTableIterator Unfiltered next = deserializer.readNext(); + UnfilteredValidation.maybeValidateUnfiltered(next, metadata(), key, sstable); // We may get empty row for the same reason expressed on UnfilteredSerializer.deserializeOne. if (next.isEmpty()) continue; http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java index 9a30d19..9f449a0 100644 --- a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java +++ b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java @@ -223,6 +223,7 @@ public class SSTableReversedIterator extends AbstractSSTableIterator && !stopReadingDisk()) { Unfiltered unfiltered = deserializer.readNext(); + UnfilteredValidation.maybeValidateUnfiltered(unfiltered, metadata(), key, sstable); // We may get empty row for the same reason expressed on UnfilteredSerializer.deserializeOne. if (!unfiltered.isEmpty()) buffer.add(unfiltered); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/db/rows/AbstractCell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/AbstractCell.java b/src/java/org/apache/cassandra/db/rows/AbstractCell.java index 4946a46..bfe7396 100644 --- a/src/java/org/apache/cassandra/db/rows/AbstractCell.java +++ b/src/java/org/apache/cassandra/db/rows/AbstractCell.java @@ -154,6 +154,13 @@ public abstract class AbstractCell extends Cell column().validateCell(this); } + public boolean hasInvalidDeletions() + { + if (ttl() < 0 || localDeletionTime() < 0 || (isExpiring() && localDeletionTime() == NO_DELETION_TIME)) + return true; + return false; + } + public long maxTimestamp() { return timestamp(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/db/rows/AbstractRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/AbstractRow.java b/src/java/org/apache/cassandra/db/rows/AbstractRow.java index 211b13f..24b088f 100644 --- a/src/java/org/apache/cassandra/db/rows/AbstractRow.java +++ b/src/java/org/apache/cassandra/db/rows/AbstractRow.java @@ -91,6 +91,18 @@ public abstract class AbstractRow extends AbstractCollection<ColumnData> impleme cd.validate(); } + public boolean hasInvalidDeletions() + { + if (primaryKeyLivenessInfo().isExpiring() && (primaryKeyLivenessInfo().ttl() < 0 || primaryKeyLivenessInfo().localExpirationTime() < 0)) + return true; + if (!deletion().time().validate()) + return true; + for (ColumnData cd : this) + if (cd.hasInvalidDeletions()) + return true; + return false; + } + public String toString(TableMetadata metadata) { return toString(metadata, false); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/db/rows/ColumnData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/ColumnData.java b/src/java/org/apache/cassandra/db/rows/ColumnData.java index ccfcfa5..f2da132 100644 --- a/src/java/org/apache/cassandra/db/rows/ColumnData.java +++ b/src/java/org/apache/cassandra/db/rows/ColumnData.java @@ -66,6 +66,13 @@ public abstract class ColumnData public abstract void validate(); /** + * Validates the deletions (ttl and local deletion time) if any. + * + * @return true if it has any invalid deletions, false otherwise + */ + public abstract boolean hasInvalidDeletions(); + + /** * Adds the data to the provided digest. * * @param hasher the {@link Hasher} to add the data to. http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java index 57851d8..3073e5f 100644 --- a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java +++ b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java @@ -138,6 +138,16 @@ public class ComplexColumnData extends ColumnData implements Iterable<Cell> cell.digest(hasher); } + public boolean hasInvalidDeletions() + { + if (!complexDeletion.validate()) + return true; + for (Cell cell : this) + if (cell.hasInvalidDeletions()) + return true; + return false; + } + public ComplexColumnData markCounterLocalToBeCleared() { return transformAndFilter(complexDeletion, Cell::markCounterLocalToBeCleared); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java index c0c6afd..094cf72 100644 --- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java +++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java @@ -68,6 +68,11 @@ public class RangeTombstoneBoundMarker extends AbstractRangeTombstoneMarker<Clus return false; } + public boolean hasInvalidDeletions() + { + return !deletionTime().validate(); + } + /** * The deletion time for the range tombstone this is a bound of. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java index 6e6c8cd..79d5b1a 100644 --- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java +++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java @@ -116,6 +116,11 @@ public class RangeTombstoneBoundaryMarker extends AbstractRangeTombstoneMarker<C return true; } + public boolean hasInvalidDeletions() + { + return !startDeletion.validate() || !endDeletion.validate(); + } + public RangeTombstoneBoundaryMarker copy(AbstractAllocator allocator) { return new RangeTombstoneBoundaryMarker(clustering().copy(allocator), endDeletion, startDeletion); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/db/rows/Unfiltered.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/Unfiltered.java b/src/java/org/apache/cassandra/db/rows/Unfiltered.java index 3a65f4e..81b63b7 100644 --- a/src/java/org/apache/cassandra/db/rows/Unfiltered.java +++ b/src/java/org/apache/cassandra/db/rows/Unfiltered.java @@ -55,6 +55,12 @@ public interface Unfiltered extends Clusterable */ public void validateData(TableMetadata metadata); + /** + * Do a quick validation of the deletions of the unfiltered (if any) + * + * @return true if any deletion is invalid + */ + public boolean hasInvalidDeletions(); public boolean isEmpty(); public String toString(TableMetadata metadata); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java index f9c6e82..a49e7b4 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java @@ -55,6 +55,8 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat try { DeletionTime partitionLevelDeletion = DeletionTime.serializer.deserialize(file); + if (!partitionLevelDeletion.validate()) + UnfilteredValidation.handleInvalid(sstable.metadata(), key, sstable, "partitionLevelDeletion="+partitionLevelDeletion.toString()); SerializationHelper helper = new SerializationHelper(sstable.metadata(), sstable.descriptor.version.correspondingMessagingVersion(), SerializationHelper.Flag.LOCAL); SSTableSimpleIterator iterator = SSTableSimpleIterator.create(sstable.metadata(), file, sstable.header, helper, partitionLevelDeletion); return new SSTableIdentityIterator(sstable, key, partitionLevelDeletion, file.getPath(), iterator); @@ -169,7 +171,9 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat protected Unfiltered doCompute() { - return iterator.next(); + Unfiltered unfiltered = iterator.next(); + UnfilteredValidation.maybeValidateUnfiltered(unfiltered, metadata(), key, sstable); + return unfiltered; } public void close() http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index ee7e445..2fade21 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -1729,6 +1729,12 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS isSuspect.getAndSet(true); } + @VisibleForTesting + public void unmarkSuspect() + { + isSuspect.getAndSet(false); + } + public boolean isMarkedSuspect() { return isSuspect.get(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 4214644..96fd63f 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -63,6 +63,7 @@ import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; import org.apache.cassandra.db.commitlog.CommitLog; @@ -5471,4 +5472,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { return AuditLogManager.getInstance().isAuditingEnabled(); } + + public String getCorruptedTombstoneStrategy() + { + return DatabaseDescriptor.getCorruptedTombstoneStrategy().toString(); + } + + public void setCorruptedTombstoneStrategy(String strategy) + { + DatabaseDescriptor.setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy.valueOf(strategy)); + logger.info("Setting corrupted tombstone strategy to {}", strategy); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index ab165b3..e54a95e 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -683,4 +683,6 @@ public interface StorageServiceMBean extends NotificationEmitter public void disableAuditLog(); public void enableAuditLog(String loggerName, String includedKeyspaces, String excludedKeyspaces, String includedCategories, String excludedCategories, String includedUsers, String excludedUsers) throws ConfigurationException; public boolean isAuditLogEnabled(); + public String getCorruptedTombstoneStrategy(); + public void setCorruptedTombstoneStrategy(String strategy); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/test/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml index 640f9b3..5893bab 100644 --- a/test/conf/cassandra.yaml +++ b/test/conf/cassandra.yaml @@ -45,3 +45,4 @@ row_cache_size_in_mb: 16 enable_user_defined_functions: true enable_scripted_user_defined_functions: true prepared_statements_cache_size_mb: 1 +corrupted_tombstone_strategy: exception http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java index 4078e2a..68435a8 100644 --- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java +++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java @@ -73,6 +73,7 @@ public class DatabaseDescriptorRefTest "org.apache.cassandra.config.Config$MemtableAllocationType", "org.apache.cassandra.config.Config$RepairCommandPoolFullStrategy", "org.apache.cassandra.config.Config$UserFunctionTimeoutPolicy", + "org.apache.cassandra.config.Config$CorruptedTombstoneStrategy", "org.apache.cassandra.config.ParameterizedClass", "org.apache.cassandra.config.EncryptionOptions", "org.apache.cassandra.config.EncryptionOptions$ClientEncryptionOptions", http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/test/unit/org/apache/cassandra/cql3/validation/operations/TTLTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/TTLTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/TTLTest.java index fc70974..99ca7dc 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/operations/TTLTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/operations/TTLTest.java @@ -9,6 +9,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.Attributes; import org.apache.cassandra.cql3.CQLTester; import org.apache.cassandra.cql3.UntypedResultSet; @@ -19,6 +21,8 @@ import org.apache.cassandra.db.rows.AbstractCell; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.utils.FBUtilities; +import org.junit.After; +import org.junit.Before; import org.junit.Test; public class TTLTest extends CQLTester @@ -31,6 +35,18 @@ public class TTLTest extends CQLTester public static final String SIMPLE_CLUSTERING = "table2"; public static final String COMPLEX_NOCLUSTERING = "table3"; public static final String COMPLEX_CLUSTERING = "table4"; + private Config.CorruptedTombstoneStrategy corruptTombstoneStrategy; + @Before + public void before() + { + corruptTombstoneStrategy = DatabaseDescriptor.getCorruptedTombstoneStrategy(); + } + + @After + public void after() + { + DatabaseDescriptor.setCorruptedTombstoneStrategy(corruptTombstoneStrategy); + } @Test public void testTTLPerRequestLimit() throws Throwable @@ -167,9 +183,12 @@ public class TTLTest extends CQLTester @Test public void testRecoverOverflowedExpirationWithScrub() throws Throwable { + // this tests writes corrupt tombstones on purpose, disable the strategy: + DatabaseDescriptor.setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy.disabled); baseTestRecoverOverflowedExpiration(false, false); baseTestRecoverOverflowedExpiration(true, false); baseTestRecoverOverflowedExpiration(true, true); + // we reset the corrupted ts strategy after each test in @After above } public void testCapExpirationDateOverflowPolicy(ExpirationDateOverflowHandling.ExpirationDateOverflowPolicy policy) throws Throwable http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java index 4d5215e..ca420da 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java @@ -17,23 +17,38 @@ */ package org.apache.cassandra.db.compaction; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.commons.lang.StringUtils; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.apache.cassandra.cql3.CQLTester; import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.RangeTombstone; +import org.apache.cassandra.db.RowUpdateBuilder; +import org.apache.cassandra.db.Slice; +import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.rows.Cell; import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.db.rows.Unfiltered; import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.io.sstable.ISSTableScanner; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.serializers.MarshalException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -42,6 +57,21 @@ public class CompactionsCQLTest extends CQLTester public static final int SLEEP_TIME = 5000; + private Config.CorruptedTombstoneStrategy strategy; + + @Before + public void before() + { + strategy = DatabaseDescriptor.getCorruptedTombstoneStrategy(); + } + + @After + public void after() + { + DatabaseDescriptor.setCorruptedTombstoneStrategy(DatabaseDescriptor.getCorruptedTombstoneStrategy()); + } + + @Test public void testTriggerMinorCompactionSTCS() throws Throwable { @@ -245,6 +275,189 @@ public class CompactionsCQLTest extends CQLTester testPerCFSNeverPurgeTombstonesHelper(false); } + @Test + public void testCompactionInvalidRTs() throws Throwable + { + // set the corruptedTombstoneStrategy to exception since these tests require it - if someone changed the default + // in test/conf/cassandra.yaml they would start failing + DatabaseDescriptor.setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy.exception); + prepare(); + // write a range tombstone with negative local deletion time (LDTs are not set by user and should not be negative): + RangeTombstone rt = new RangeTombstone(Slice.ALL, new DeletionTime(System.currentTimeMillis(), -1)); + RowUpdateBuilder rub = new RowUpdateBuilder(getCurrentColumnFamilyStore().metadata(), System.currentTimeMillis() * 1000, 22).clustering(33).addRangeTombstone(rt); + rub.build().apply(); + getCurrentColumnFamilyStore().forceBlockingFlush(); + compactAndValidate(); + readAndValidate(true); + readAndValidate(false); + } + + @Test + public void testCompactionInvalidTombstone() throws Throwable + { + DatabaseDescriptor.setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy.exception); + prepare(); + // write a standard tombstone with negative local deletion time (LDTs are not set by user and should not be negative): + RowUpdateBuilder rub = new RowUpdateBuilder(getCurrentColumnFamilyStore().metadata(), -1, System.currentTimeMillis() * 1000, 22).clustering(33).delete("b"); + rub.build().apply(); + getCurrentColumnFamilyStore().forceBlockingFlush(); + compactAndValidate(); + readAndValidate(true); + readAndValidate(false); + } + + @Test + public void testCompactionInvalidPartitionDeletion() throws Throwable + { + DatabaseDescriptor.setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy.exception); + prepare(); + // write a partition deletion with negative local deletion time (LDTs are not set by user and should not be negative):: + PartitionUpdate pu = PartitionUpdate.simpleBuilder(getCurrentColumnFamilyStore().metadata(), 22).nowInSec(-1).delete().build(); + new Mutation(pu).apply(); + getCurrentColumnFamilyStore().forceBlockingFlush(); + compactAndValidate(); + readAndValidate(true); + readAndValidate(false); + } + + @Test + public void testCompactionInvalidRowDeletion() throws Throwable + { + DatabaseDescriptor.setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy.exception); + prepare(); + // write a row deletion with negative local deletion time (LDTs are not set by user and should not be negative): + RowUpdateBuilder.deleteRowAt(getCurrentColumnFamilyStore().metadata(), System.currentTimeMillis() * 1000, -1, 22, 33).apply(); + getCurrentColumnFamilyStore().forceBlockingFlush(); + compactAndValidate(); + readAndValidate(true); + readAndValidate(false); + } + + private void prepare() throws Throwable + { + createTable("CREATE TABLE %s (id int, id2 int, b text, primary key (id, id2))"); + for (int i = 0; i < 2; i++) + execute("INSERT INTO %s (id, id2, b) VALUES (?, ?, ?)", i, i, String.valueOf(i)); + } + + @Test + public void testIndexedReaderRowDeletion() throws Throwable + { + // write enough data to make sure we use an IndexedReader when doing a read, and make sure it fails when reading a corrupt row deletion + DatabaseDescriptor.setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy.exception); + int maxSizePre = DatabaseDescriptor.getColumnIndexSize(); + DatabaseDescriptor.setColumnIndexSize(1024); + prepareWide(); + RowUpdateBuilder.deleteRowAt(getCurrentColumnFamilyStore().metadata(), System.currentTimeMillis() * 1000, -1, 22, 33).apply(); + getCurrentColumnFamilyStore().forceBlockingFlush(); + readAndValidate(true); + readAndValidate(false); + DatabaseDescriptor.setColumnIndexSize(maxSizePre); + } + + @Test + public void testIndexedReaderTombstone() throws Throwable + { + // write enough data to make sure we use an IndexedReader when doing a read, and make sure it fails when reading a corrupt standard tombstone + DatabaseDescriptor.setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy.exception); + int maxSizePre = DatabaseDescriptor.getColumnIndexSize(); + DatabaseDescriptor.setColumnIndexSize(1024); + prepareWide(); + RowUpdateBuilder rub = new RowUpdateBuilder(getCurrentColumnFamilyStore().metadata(), -1, System.currentTimeMillis() * 1000, 22).clustering(33).delete("b"); + rub.build().apply(); + getCurrentColumnFamilyStore().forceBlockingFlush(); + readAndValidate(true); + readAndValidate(false); + DatabaseDescriptor.setColumnIndexSize(maxSizePre); + } + + @Test + public void testIndexedReaderRT() throws Throwable + { + // write enough data to make sure we use an IndexedReader when doing a read, and make sure it fails when reading a corrupt range tombstone + DatabaseDescriptor.setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy.exception); + int maxSizePre = DatabaseDescriptor.getColumnIndexSize(); + DatabaseDescriptor.setColumnIndexSize(1024); + prepareWide(); + RangeTombstone rt = new RangeTombstone(Slice.ALL, new DeletionTime(System.currentTimeMillis(), -1)); + RowUpdateBuilder rub = new RowUpdateBuilder(getCurrentColumnFamilyStore().metadata(), System.currentTimeMillis() * 1000, 22).clustering(33).addRangeTombstone(rt); + rub.build().apply(); + getCurrentColumnFamilyStore().forceBlockingFlush(); + readAndValidate(true); + readAndValidate(false); + DatabaseDescriptor.setColumnIndexSize(maxSizePre); + } + + private void prepareWide() throws Throwable + { + createTable("CREATE TABLE %s (id int, id2 int, b text, primary key (id, id2))"); + for (int i = 0; i < 100; i++) + execute("INSERT INTO %s (id, id2, b) VALUES (?, ?, ?)", 22, i, StringUtils.repeat("ABCDEFG", 10)); + } + + private void compactAndValidate() + { + boolean gotException = false; + try + { + getCurrentColumnFamilyStore().forceMajorCompaction(); + } + catch(Throwable t) + { + gotException = true; + Throwable cause = t; + while (cause != null && !(cause instanceof MarshalException)) + cause = cause.getCause(); + assertNotNull(cause); + MarshalException me = (MarshalException) cause; + assertTrue(me.getMessage().contains(getCurrentColumnFamilyStore().metadata.keyspace+"."+getCurrentColumnFamilyStore().metadata.name)); + assertTrue(me.getMessage().contains("Key 22")); + } + assertTrue(gotException); + assertSuspectAndReset(getCurrentColumnFamilyStore().getLiveSSTables()); + } + + private void readAndValidate(boolean asc) throws Throwable + { + execute("select * from %s where id = 0 order by id2 "+(asc ? "ASC" : "DESC")); + + boolean gotException = false; + try + { + for (UntypedResultSet.Row r : execute("select * from %s")) {} + } + catch (Throwable t) + { + assertTrue(t instanceof CorruptSSTableException); + gotException = true; + Throwable cause = t; + while (cause != null && !(cause instanceof MarshalException)) + cause = cause.getCause(); + assertNotNull(cause); + MarshalException me = (MarshalException) cause; + assertTrue(me.getMessage().contains("Key 22")); + } + assertSuspectAndReset(getCurrentColumnFamilyStore().getLiveSSTables()); + assertTrue(gotException); + gotException = false; + try + { + execute("select * from %s where id = 22 order by id2 "+(asc ? "ASC" : "DESC")); + } + catch (Throwable t) + { + assertTrue(t instanceof CorruptSSTableException); + gotException = true; + Throwable cause = t; + while (cause != null && !(cause instanceof MarshalException)) + cause = cause.getCause(); + assertNotNull(cause); + MarshalException me = (MarshalException) cause; + assertTrue(me.getMessage().contains("Key 22")); + } + assertTrue(gotException); + assertSuspectAndReset(getCurrentColumnFamilyStore().getLiveSSTables()); + } public void testPerCFSNeverPurgeTombstonesHelper(boolean deletedCell) throws Throwable { @@ -281,6 +494,16 @@ public class CompactionsCQLTest extends CQLTester getCurrentColumnFamilyStore().truncateBlocking(); } + private void assertSuspectAndReset(Collection<SSTableReader> sstables) + { + assertFalse(sstables.isEmpty()); + for (SSTableReader sstable : sstables) + { + assertTrue(sstable.isMarkedSuspect()); + sstable.unmarkSuspect(); + } + } + private void assertTombstones(SSTableReader sstable, boolean expectTS) { boolean foundTombstone = false; http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java index 581109c..2510c5e 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java @@ -70,10 +70,14 @@ public class SSTableCorruptionDetectionTest extends SSTableWriterTestBase private static LifecycleTransaction txn; private static ColumnFamilyStore cfs; private static SSTableReader ssTableReader; + private static Config.CorruptedTombstoneStrategy original; @BeforeClass public static void setUp() { + // this test writes corrupted data on purpose, disable corrupted tombstone detection + original = DatabaseDescriptor.getCorruptedTombstoneStrategy(); + DatabaseDescriptor.setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy.disabled); TableMetadata.Builder cfm = TableMetadata.builder(keyspace, table) .addPartitionKeyColumn("pk", AsciiType.instance) @@ -127,6 +131,7 @@ public class SSTableCorruptionDetectionTest extends SSTableWriterTestBase txn.abort(); writer.close(); + DatabaseDescriptor.setCorruptedTombstoneStrategy(original); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org