Legacy deserializer can create unexpected boundary range tombstones patch by Sylvain Lebresne; reviewed by Branimir Lambov for CASSANDRA-13237
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ab717484 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ab717484 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ab717484 Branch: refs/heads/cassandra-3.11 Commit: ab7174849599c62f4bef3cb719c644bae13e9321 Parents: 42977db Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Thu Feb 23 14:32:03 2017 +0100 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Thu Feb 23 14:32:34 2017 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/UnfilteredDeserializer.java | 343 ++++++++++--------- .../cassandra/db/rows/RangeTombstoneMarker.java | 2 +- .../apache/cassandra/service/DataResolver.java | 31 +- .../cassandra/db/OldFormatDeserializerTest.java | 110 ++++++ .../cassandra/service/DataResolverTest.java | 129 ++++++- 6 files changed, 436 insertions(+), 180 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab717484/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e978a5c..386029e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.12 + * Legacy deserializer can create unexpected boundary range tombstones (CASSANDRA-13237) * Remove unnecessary assertion from AntiCompactionTest (CASSANDRA-13070) * Fix cqlsh COPY for dates before 1900 (CASSANDRA-13185) Merged from 2.2 http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab717484/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java index a2d51e13..42a806a 100644 --- a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java +++ b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java @@ -20,7 +20,9 @@ package org.apache.cassandra.db; import java.io.IOException; import java.io.IOError; import java.util.*; +import java.util.function.Supplier; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; import com.google.common.collect.PeekingIterator; @@ -265,11 +267,23 @@ public abstract class UnfilteredDeserializer boolean readAllAsDynamic) { super(metadata, in, helper); - this.iterator = new UnfilteredIterator(partitionDeletion); + this.iterator = new UnfilteredIterator(metadata, partitionDeletion, helper, this::readAtom); this.readAllAsDynamic = readAllAsDynamic; this.lastConsumedPosition = currentPosition(); } + private LegacyLayout.LegacyAtom readAtom() + { + try + { + return LegacyLayout.readLegacyAtom(metadata, in, readAllAsDynamic); + } + catch (IOException e) + { + throw new IOError(e); + } + } + public void setSkipStatic() { this.skipStatic = true; @@ -317,15 +331,6 @@ public abstract class UnfilteredDeserializer } } - private boolean isRow(LegacyLayout.LegacyAtom atom) - { - if (atom.isCell()) - return true; - - LegacyLayout.LegacyRangeTombstone tombstone = atom.asRangeTombstone(); - return tombstone.isCollectionTombstone() || tombstone.isRowDeletion(metadata); - } - public int compareNextTo(Slice.Bound bound) throws IOException { if (!hasNext()) @@ -389,19 +394,36 @@ public abstract class UnfilteredDeserializer // Groups atoms from the input into proper Unfiltered. // Note: this could use guava AbstractIterator except that we want to be able to clear // the internal state of the iterator so it's cleaner to do it ourselves. - private class UnfilteredIterator implements PeekingIterator<Unfiltered> + @VisibleForTesting + static class UnfilteredIterator implements PeekingIterator<Unfiltered> { private final AtomIterator atoms; private final LegacyLayout.CellGrouper grouper; private final TombstoneTracker tombstoneTracker; + private final CFMetaData metadata; + private final SerializationHelper helper; private Unfiltered next; - private UnfilteredIterator(DeletionTime partitionDeletion) + UnfilteredIterator(CFMetaData metadata, + DeletionTime partitionDeletion, + SerializationHelper helper, + Supplier<LegacyLayout.LegacyAtom> atomReader) { + this.metadata = metadata; + this.helper = helper; this.grouper = new LegacyLayout.CellGrouper(metadata, helper); this.tombstoneTracker = new TombstoneTracker(partitionDeletion); - this.atoms = new AtomIterator(tombstoneTracker); + this.atoms = new AtomIterator(atomReader); + } + + private boolean isRow(LegacyLayout.LegacyAtom atom) + { + if (atom.isCell()) + return true; + + LegacyLayout.LegacyRangeTombstone tombstone = atom.asRangeTombstone(); + return tombstone.isCollectionTombstone() || tombstone.isRowDeletion(metadata); } public boolean hasNext() @@ -478,195 +500,200 @@ public abstract class UnfilteredDeserializer { throw new UnsupportedOperationException(); } - } - - // Wraps the input of the deserializer to provide an iterator (and skip shadowed atoms). - // Note: this could use guava AbstractIterator except that we want to be able to clear - // the internal state of the iterator so it's cleaner to do it ourselves. - private class AtomIterator implements PeekingIterator<LegacyLayout.LegacyAtom> - { - private final TombstoneTracker tombstoneTracker; - private boolean isDone; - private LegacyLayout.LegacyAtom next; - private AtomIterator(TombstoneTracker tombstoneTracker) + // Wraps the input of the deserializer to provide an iterator (and skip shadowed atoms). + // Note: this could use guava AbstractIterator except that we want to be able to clear + // the internal state of the iterator so it's cleaner to do it ourselves. + private class AtomIterator implements PeekingIterator<LegacyLayout.LegacyAtom> { - this.tombstoneTracker = tombstoneTracker; - } + private final Supplier<LegacyLayout.LegacyAtom> atomReader; + private boolean isDone; + private LegacyLayout.LegacyAtom next; - public boolean hasNext() - { - if (isDone) - return false; + private AtomIterator(Supplier<LegacyLayout.LegacyAtom> atomReader) + { + this.atomReader = atomReader; + } - if (next == null) + public boolean hasNext() { - next = readAtom(); + if (isDone) + return false; + if (next == null) { - isDone = true; - return false; + next = atomReader.get(); + if (next == null) + { + isDone = true; + return false; + } } + return true; } - return true; - } - private LegacyLayout.LegacyAtom readAtom() - { - try + public LegacyLayout.LegacyAtom next() { - return LegacyLayout.readLegacyAtom(metadata, in, readAllAsDynamic); + if (!hasNext()) + throw new UnsupportedOperationException(); + LegacyLayout.LegacyAtom toReturn = next; + next = null; + return toReturn; } - catch (IOException e) + + public LegacyLayout.LegacyAtom peek() { - throw new IOError(e); + if (!hasNext()) + throw new UnsupportedOperationException(); + return next; } - } - public LegacyLayout.LegacyAtom next() - { - if (!hasNext()) - throw new UnsupportedOperationException(); - LegacyLayout.LegacyAtom toReturn = next; - next = null; - return toReturn; - } + public void clearState() + { + this.next = null; + this.isDone = false; + } - public LegacyLayout.LegacyAtom peek() - { - if (!hasNext()) + public void remove() + { throw new UnsupportedOperationException(); - return next; + } } - public void clearState() + /** + * Tracks which range tombstones are open when deserializing the old format. + */ + private class TombstoneTracker { - this.next = null; - this.isDone = false; - } + private final DeletionTime partitionDeletion; - public void remove() - { - throw new UnsupportedOperationException(); - } - } + // Open tombstones sorted by their closing bound (i.e. first tombstone is the first to close). + // As we only track non-fully-shadowed ranges, the first range is necessarily the currently + // open tombstone (the one with the higher timestamp). + private final SortedSet<LegacyLayout.LegacyRangeTombstone> openTombstones; - /** - * Tracks which range tombstones are open when deserializing the old format. - */ - private class TombstoneTracker - { - private final DeletionTime partitionDeletion; + public TombstoneTracker(DeletionTime partitionDeletion) + { + this.partitionDeletion = partitionDeletion; + this.openTombstones = new TreeSet<>((rt1, rt2) -> metadata.comparator.compare(rt1.stop.bound, rt2.stop.bound)); + } - // Open tombstones sorted by their closing bound (i.e. first tombstone is the first to close). - // As we only track non-fully-shadowed ranges, the first range is necessarily the currently - // open tombstone (the one with the higher timestamp). - private final SortedSet<LegacyLayout.LegacyRangeTombstone> openTombstones; + /** + * Checks if the provided atom is fully shadowed by the open tombstones of this tracker (or the partition deletion). + */ + public boolean isShadowed(LegacyLayout.LegacyAtom atom) + { + assert !hasClosingMarkerBefore(atom); + long timestamp = atom.isCell() ? atom.asCell().timestamp : atom.asRangeTombstone().deletionTime.markedForDeleteAt(); - public TombstoneTracker(DeletionTime partitionDeletion) - { - this.partitionDeletion = partitionDeletion; - this.openTombstones = new TreeSet<>((rt1, rt2) -> metadata.comparator.compare(rt1.stop.bound, rt2.stop.bound)); - } + if (partitionDeletion.deletes(timestamp)) + return true; - /** - * Checks if the provided atom is fully shadowed by the open tombstones of this tracker (or the partition deletion). - */ - public boolean isShadowed(LegacyLayout.LegacyAtom atom) - { - assert !hasClosingMarkerBefore(atom); - long timestamp = atom.isCell() ? atom.asCell().timestamp : atom.asRangeTombstone().deletionTime.markedForDeleteAt(); + SortedSet<LegacyLayout.LegacyRangeTombstone> coveringTombstones = isRow(atom) ? openTombstones : openTombstones.tailSet(atom.asRangeTombstone()); + return Iterables.any(coveringTombstones, tombstone -> tombstone.deletionTime.deletes(timestamp)); + } - if (partitionDeletion.deletes(timestamp)) - return true; + /** + * Whether the currently open marker closes stricly before the provided row/RT. + */ + public boolean hasClosingMarkerBefore(LegacyLayout.LegacyAtom atom) + { + return !openTombstones.isEmpty() + && metadata.comparator.compare(openTombstones.first().stop.bound, atom.clustering()) < 0; + } - SortedSet<LegacyLayout.LegacyRangeTombstone> coveringTombstones = isRow(atom) ? openTombstones : openTombstones.tailSet(atom.asRangeTombstone()); - return Iterables.any(coveringTombstones, tombstone -> tombstone.deletionTime.deletes(timestamp)); - } + /** + * Returns the unfiltered corresponding to closing the currently open marker (and update the tracker accordingly). + */ + public Unfiltered popClosingMarker() + { + assert !openTombstones.isEmpty(); - /** - * Whether the currently open marker closes stricly before the provided row/RT. - */ - public boolean hasClosingMarkerBefore(LegacyLayout.LegacyAtom atom) - { - return !openTombstones.isEmpty() - && metadata.comparator.compare(openTombstones.first().stop.bound, atom.clustering()) < 0; - } + Iterator<LegacyLayout.LegacyRangeTombstone> iter = openTombstones.iterator(); + LegacyLayout.LegacyRangeTombstone first = iter.next(); + iter.remove(); - /** - * Returns the unfiltered corresponding to closing the currently open marker (and update the tracker accordingly). - */ - public Unfiltered popClosingMarker() - { - assert !openTombstones.isEmpty(); + // If that was the last open tombstone, we just want to close it. Otherwise, we have a boundary with the + // next tombstone + if (!iter.hasNext()) + return new RangeTombstoneBoundMarker(first.stop.bound, first.deletionTime); - Iterator<LegacyLayout.LegacyRangeTombstone> iter = openTombstones.iterator(); - LegacyLayout.LegacyRangeTombstone first = iter.next(); - iter.remove(); + LegacyLayout.LegacyRangeTombstone next = iter.next(); + return RangeTombstoneBoundaryMarker.makeBoundary(false, first.stop.bound, first.stop.bound.invert(), first.deletionTime, next.deletionTime); + } - // If that was the last open tombstone, we just want to close it. Otherwise, we have a boundary with the - // next tombstone - if (!iter.hasNext()) - return new RangeTombstoneBoundMarker(first.stop.bound, first.deletionTime); + /** + * Update the tracker given the provided newly open tombstone. This return the Unfiltered corresponding to the opening + * of said tombstone: this can be a simple open mark, a boundary (if there was an open tombstone superseded by this new one) + * or even null (if the new tombstone start is supersedes by the currently open tombstone). + * + * Note that this method assume the added tombstone is not fully shadowed, i.e. that !isShadowed(tombstone). It also + * assumes no opened tombstone closes before that tombstone (so !hasClosingMarkerBefore(tombstone)). + */ + public Unfiltered openNew(LegacyLayout.LegacyRangeTombstone tombstone) + { + if (openTombstones.isEmpty()) + { + openTombstones.add(tombstone); + return new RangeTombstoneBoundMarker(tombstone.start.bound, tombstone.deletionTime); + } - LegacyLayout.LegacyRangeTombstone next = iter.next(); - return RangeTombstoneBoundaryMarker.makeBoundary(false, first.stop.bound, first.stop.bound.invert(), first.deletionTime, next.deletionTime); - } + // Add the new tombstone, and then check if it changes the currently open deletion or not. + // Note: we grab the first tombstone (which represents the currently open deletion time) before adding + // because add() can remove that first. + Iterator<LegacyLayout.LegacyRangeTombstone> iter = openTombstones.iterator(); + LegacyLayout.LegacyRangeTombstone first = iter.next(); - /** - * Update the tracker given the provided newly open tombstone. This return the Unfiltered corresponding to the opening - * of said tombstone: this can be a simple open mark, a boundary (if there was an open tombstone superseded by this new one) - * or even null (if the new tombston start is supersedes by the currently open tombstone). - * - * Note that this method assume the added tombstone is not fully shadowed, i.e. that !isShadowed(tombstone). It also - * assumes no opened tombstone closes before that tombstone (so !hasClosingMarkerBefore(tombstone)). - */ - public Unfiltered openNew(LegacyLayout.LegacyRangeTombstone tombstone) - { - if (openTombstones.isEmpty()) - { - openTombstones.add(tombstone); - return new RangeTombstoneBoundMarker(tombstone.start.bound, tombstone.deletionTime); + add(tombstone); + + // If the newly opened tombstone superseds the currently open one, we have to produce a boundary to change + // the currently open deletion time, otherwise we have nothing to do. + return tombstone.deletionTime.supersedes(first.deletionTime) + ? RangeTombstoneBoundaryMarker.makeBoundary(false, tombstone.start.bound.invert(), tombstone.start.bound, first.deletionTime, tombstone.deletionTime) + : null; } - Iterator<LegacyLayout.LegacyRangeTombstone> iter = openTombstones.iterator(); - LegacyLayout.LegacyRangeTombstone first = iter.next(); - if (tombstone.deletionTime.supersedes(first.deletionTime)) + /** + * Adds a new tombstone to openTombstones, removing anything that would be shadowed by this new tombstone. + */ + private void add(LegacyLayout.LegacyRangeTombstone tombstone) { - // We're supperseding the currently open tombstone, so we should produce a boundary that close the currently open - // one and open the new one. We should also add the tombstone, but if it stop after the first one, we should - // also remove that first tombstone as it won't be useful anymore. - if (metadata.comparator.compare(tombstone.stop.bound, first.stop.bound) >= 0) - iter.remove(); + // First, remove existing tombstone that is shadowed by this tombstone. + Iterator<LegacyLayout.LegacyRangeTombstone> iter = openTombstones.iterator(); + while (iter.hasNext()) + { + LegacyLayout.LegacyRangeTombstone existing = iter.next(); + // openTombstones is ordered by stop bound and the new tombstone can't be shadowing anything that + // stop after it. + if (metadata.comparator.compare(tombstone.stop.bound, existing.stop.bound) < 0) + break; + + // Note that we remove an existing tombstone even if it is equal to the new one because in that case, + // either the existing strictly stops before the new one and we don't want it, or it stops exactly + // like the new one but we're going to inconditionally add the new one anyway. + if (!existing.deletionTime.supersedes(tombstone.deletionTime)) + iter.remove(); + } openTombstones.add(tombstone); - return RangeTombstoneBoundaryMarker.makeBoundary(false, tombstone.start.bound.invert(), tombstone.start.bound, first.deletionTime, tombstone.deletionTime); } - else + + public boolean hasOpenTombstones() { - // If the new tombstone don't supersedes the currently open tombstone, we don't have anything to return, we - // just add the new tombstone (because we know tombstone is not fully shadowed, this imply the new tombstone - // simply extend after the first one and we'll deal with it later) - assert metadata.comparator.compare(tombstone.start.bound, first.stop.bound) <= 0; - openTombstones.add(tombstone); - return null; + return !openTombstones.isEmpty(); } - } - public boolean hasOpenTombstones() - { - return !openTombstones.isEmpty(); - } - - private boolean formBoundary(LegacyLayout.LegacyRangeTombstone close, LegacyLayout.LegacyRangeTombstone open) - { - return metadata.comparator.compare(close.stop.bound, open.start.bound) == 0; - } + private boolean formBoundary(LegacyLayout.LegacyRangeTombstone close, LegacyLayout.LegacyRangeTombstone open) + { + return metadata.comparator.compare(close.stop.bound, open.start.bound) == 0; + } - public void clearState() - { - openTombstones.clear(); + public void clearState() + { + openTombstones.clear(); + } } } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab717484/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java index 5771a86..1cd5fb4 100644 --- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java +++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java @@ -27,7 +27,7 @@ import org.apache.cassandra.utils.memory.AbstractAllocator; /** * A marker for a range tombstone bound. * <p> - * There is 2 types of markers: bounds (see {@link RangeTombstoneBound}) and boundaries (see {@link RangeTombstoneBoundary}). + * There is 2 types of markers: bounds (see {@link RangeTombstoneBoundMarker}) and boundaries (see {@link RangeTombstoneBoundaryMarker}). */ public interface RangeTombstoneMarker extends Unfiltered { http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab717484/src/java/org/apache/cassandra/service/DataResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java index 01953e1..60cfbba 100644 --- a/src/java/org/apache/cassandra/service/DataResolver.java +++ b/src/java/org/apache/cassandra/service/DataResolver.java @@ -297,26 +297,33 @@ public class DataResolver extends ResponseResolver // active after that point. Further whatever deletion was open or is open by this marker on the // source, that deletion cannot supersedes the current one. // - // What we want to know here is if the source deletion and merged deletion was or will be equal, - // because in that case we don't want to include any repair for the source, and otherwise we do. + // But while the marker deletion (before and/or after this point) cannot supersed the current + // deletion, we want to know if it's equal to it (both before and after), because in that case + // the source is up to date and we don't want to include repair. // - // Note further that if the marker is a boundary, as both side of that boundary will have a - // different deletion time, only one side might be equal to the merged deletion. This means we - // can only be in one of 2 cases: - // 1) the source was up-to-date on deletion up to that point (markerToRepair[i] == null), and then - // it won't be from that point on. + // So in practice we have 2 possible case: + // 1) the source was up-to-date on deletion up to that point (markerToRepair[i] == null). Then + // it won't be from that point on unless it's a boundary and the new opened deletion time + // is also equal to the current deletion (note that this implies the boundary has the same + // closing and opening deletion time, which should generally not happen, but can due to legacy + // reading code not avoiding this for a while, see CASSANDRA-13237). // 2) the source wasn't up-to-date on deletion up to that point (markerToRepair[i] != null), and // it may now be (if it isn't we just have nothing to do for that marker). - assert !currentDeletion.isLive(); + assert !currentDeletion.isLive() : currentDeletion.toString(); if (markerToRepair[i] == null) { // Since there is an ongoing merged deletion, the only way we don't have an open repair for // this source is that it had a range open with the same deletion as current and it's - // closing it. This imply we need to open a deletion for the source from that point. - assert marker.isClose(isReversed) && currentDeletion.equals(marker.closeDeletionTime(isReversed)); - assert !marker.isOpen(isReversed) || currentDeletion.supersedes(marker.openDeletionTime(isReversed)); - markerToRepair[i] = marker.closeBound(isReversed).invert(); + // closing it. + assert marker.isClose(isReversed) && currentDeletion.equals(marker.closeDeletionTime(isReversed)) + : String.format("currentDeletion=%s, marker=%s", currentDeletion, marker.toString(command.metadata())); + + // and so unless it's a boundary whose opening deletion time is still equal to the current + // deletion (see comment above for why this can actually happen), we have to repair the source + // from that point on. + if (!(marker.isOpen(isReversed) && currentDeletion.equals(marker.openDeletionTime(isReversed)))) + markerToRepair[i] = marker.closeBound(isReversed).invert(); } // In case 2) above, we only have something to do if the source is up-to-date after that point else if (marker.isOpen(isReversed) && currentDeletion.equals(marker.openDeletionTime(isReversed))) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab717484/test/unit/org/apache/cassandra/db/OldFormatDeserializerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/OldFormatDeserializerTest.java b/test/unit/org/apache/cassandra/db/OldFormatDeserializerTest.java new file mode 100644 index 0000000..1060569 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/OldFormatDeserializerTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import java.util.function.Supplier; + +import org.junit.Test; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.UnfilteredDeserializer.OldFormatDeserializer.UnfilteredIterator; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.rows.RangeTombstoneMarker; +import org.apache.cassandra.db.rows.SerializationHelper; +import org.apache.cassandra.db.rows.Unfiltered; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; + +import static org.junit.Assert.*; + +public class OldFormatDeserializerTest +{ + @Test + public void testRangeTombstones() throws Exception + { + CFMetaData metadata = CFMetaData.Builder.create("ks", "table") + .withPartitioner(Murmur3Partitioner.instance) + .addPartitionKey("k", Int32Type.instance) + .addClusteringColumn("v", Int32Type.instance) + .build(); + + Supplier<LegacyLayout.LegacyAtom> atomSupplier = supplier(rt(0, 10, 42), + rt(5, 15, 42)); + + UnfilteredIterator iterator = new UnfilteredIterator(metadata, + DeletionTime.LIVE, + new SerializationHelper(metadata, MessagingService.current_version, SerializationHelper.Flag.LOCAL), + atomSupplier); + + // As the deletion time are the same, we want this to produce a single range tombstone covering from 0 to 15. + + assertTrue(iterator.hasNext()); + + Unfiltered first = iterator.next(); + assertTrue(first.isRangeTombstoneMarker()); + RangeTombstoneMarker start = (RangeTombstoneMarker)first; + assertTrue(start.isOpen(false)); + assertFalse(start.isClose(false)); + assertEquals(0, toInt(start.openBound(false))); + assertEquals(42, start.openDeletionTime(false).markedForDeleteAt()); + + Unfiltered second = iterator.next(); + assertTrue(second.isRangeTombstoneMarker()); + RangeTombstoneMarker end = (RangeTombstoneMarker)second; + assertTrue(end.isClose(false)); + assertFalse(end.isOpen(false)); + assertEquals(15, toInt(end.closeBound(false))); + assertEquals(42, end.closeDeletionTime(false).markedForDeleteAt()); + + assertFalse(iterator.hasNext()); + } + + private static int toInt(ClusteringPrefix prefix) + { + assertTrue(prefix.size() == 1); + return ByteBufferUtil.toInt(prefix.get(0)); + } + + private static Supplier<LegacyLayout.LegacyAtom> supplier(LegacyLayout.LegacyAtom... atoms) + { + return new Supplier<LegacyLayout.LegacyAtom>() + { + int i = 0; + + public LegacyLayout.LegacyAtom get() + { + return i >= atoms.length ? null : atoms[i++]; + } + }; + } + + private static LegacyLayout.LegacyAtom rt(int start, int end, int deletion) + { + return new LegacyLayout.LegacyRangeTombstone(bound(start, true), bound(end, false), new DeletionTime(deletion, FBUtilities.nowInSeconds())); + } + + private static LegacyLayout.LegacyBound bound(int b, boolean isStart) + { + return new LegacyLayout.LegacyBound(isStart ? Slice.Bound.inclusiveStartOf(ByteBufferUtil.bytes(b)) : Slice.Bound.inclusiveEndOf(ByteBufferUtil.bytes(b)), + false, + null); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab717484/test/unit/org/apache/cassandra/service/DataResolverTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java b/test/unit/org/apache/cassandra/service/DataResolverTest.java index fd1e54e..2f72093 100644 --- a/test/unit/org/apache/cassandra/service/DataResolverTest.java +++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java @@ -554,6 +554,73 @@ public class DataResolverTest assertRepairContainsDeletions(msg2, null, one_two, withExclusiveEndIf(three_four, timestamp2 >= timestamp1), five_six); } + /** + * Test cases where a boundary of a source is covered by another source deletion and timestamp on one or both side + * of the boundary are equal to the "merged" deletion. + * This is a test for CASSANDRA-13237 to make sure we handle this case properly. + */ + @Test + public void testRepairRangeTombstoneBoundary() throws UnknownHostException + { + testRepairRangeTombstoneBoundary(1, 0, 1); + messageRecorder.sent.clear(); + testRepairRangeTombstoneBoundary(1, 1, 0); + messageRecorder.sent.clear(); + testRepairRangeTombstoneBoundary(1, 1, 1); + } + + /** + * Test for CASSANDRA-13237, checking we don't fail (and handle correctly) the case where a RT boundary has the + * same deletion on both side (while is useless but could be created by legacy code pre-CASSANDRA-13237 and could + * thus still be sent). + */ + public void testRepairRangeTombstoneBoundary(int timestamp1, int timestamp2, int timestamp3) throws UnknownHostException + { + DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2); + InetAddress peer1 = peer(); + InetAddress peer2 = peer(); + + // 1st "stream" + RangeTombstone one_nine = tombstone("0", true , "9", true, timestamp1, nowInSec); + UnfilteredPartitionIterator iter1 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk) + .addRangeTombstone(one_nine) + .buildUpdate()); + + // 2nd "stream" (build more manually to ensure we have the boundary we want) + RangeTombstoneBoundMarker open_one = marker("0", true, true, timestamp2, nowInSec); + RangeTombstoneBoundaryMarker boundary_five = boundary("5", false, timestamp2, nowInSec, timestamp3, nowInSec); + RangeTombstoneBoundMarker close_nine = marker("9", false, true, timestamp3, nowInSec); + UnfilteredPartitionIterator iter2 = iter(dk, open_one, boundary_five, close_nine); + + resolver.preprocess(readResponseMessage(peer1, iter1)); + resolver.preprocess(readResponseMessage(peer2, iter2)); + + boolean shouldHaveRepair = timestamp1 != timestamp2 || timestamp1 != timestamp3; + + // No results, we've only reconciled tombstones. + try (PartitionIterator data = resolver.resolve()) + { + assertFalse(data.hasNext()); + assertRepairFuture(resolver, shouldHaveRepair ? 1 : 0); + } + + assertEquals(shouldHaveRepair? 1 : 0, messageRecorder.sent.size()); + + if (!shouldHaveRepair) + return; + + MessageOut msg = getSentMessage(peer2); + assertRepairMetadata(msg); + assertRepairContainsNoColumns(msg); + + RangeTombstone expected = timestamp1 != timestamp2 + // We've repaired the 1st part + ? tombstone("0", true, "5", false, timestamp1, nowInSec) + // We've repaired the 2nd part + : tombstone("5", true, "9", true, timestamp1, nowInSec); + assertRepairContainsDeletions(msg, null, expected); + } + // Forces the start to be exclusive if the condition holds private static RangeTombstone withExclusiveStartIf(RangeTombstone rt, boolean condition) { @@ -873,18 +940,40 @@ public class DataResolverTest private RangeTombstone tombstone(Object start, boolean inclusiveStart, Object end, boolean inclusiveEnd, long markedForDeleteAt, int localDeletionTime) { - RangeTombstone.Bound.Kind startKind = inclusiveStart - ? Kind.INCL_START_BOUND - : Kind.EXCL_START_BOUND; - RangeTombstone.Bound.Kind endKind = inclusiveEnd - ? Kind.INCL_END_BOUND - : Kind.EXCL_END_BOUND; - - RangeTombstone.Bound startBound = new RangeTombstone.Bound(startKind, cfm.comparator.make(start).getRawValues()); - RangeTombstone.Bound endBound = new RangeTombstone.Bound(endKind, cfm.comparator.make(end).getRawValues()); + RangeTombstone.Bound startBound = rtBound(start, true, inclusiveStart); + RangeTombstone.Bound endBound = rtBound(end, false, inclusiveEnd); return new RangeTombstone(Slice.make(startBound, endBound), new DeletionTime(markedForDeleteAt, localDeletionTime)); } + private RangeTombstone.Bound rtBound(Object value, boolean isStart, boolean inclusive) + { + RangeTombstone.Bound.Kind kind = isStart + ? (inclusive ? Kind.INCL_START_BOUND : Kind.EXCL_START_BOUND) + : (inclusive ? Kind.INCL_END_BOUND : Kind.EXCL_END_BOUND); + + return new RangeTombstone.Bound(kind, cfm.comparator.make(value).getRawValues()); + } + + private RangeTombstone.Bound rtBoundary(Object value, boolean inclusiveOnEnd) + { + RangeTombstone.Bound.Kind kind = inclusiveOnEnd + ? Kind.INCL_END_EXCL_START_BOUNDARY + : Kind.EXCL_END_INCL_START_BOUNDARY; + return new RangeTombstone.Bound(kind, cfm.comparator.make(value).getRawValues()); + } + + private RangeTombstoneBoundMarker marker(Object value, boolean isStart, boolean inclusive, long markedForDeleteAt, int localDeletionTime) + { + return new RangeTombstoneBoundMarker(rtBound(value, isStart, inclusive), new DeletionTime(markedForDeleteAt, localDeletionTime)); + } + + private RangeTombstoneBoundaryMarker boundary(Object value, boolean inclusiveOnEnd, long markedForDeleteAt1, int localDeletionTime1, long markedForDeleteAt2, int localDeletionTime2) + { + return new RangeTombstoneBoundaryMarker(rtBoundary(value, inclusiveOnEnd), + new DeletionTime(markedForDeleteAt1, localDeletionTime1), + new DeletionTime(markedForDeleteAt2, localDeletionTime2)); + } + private UnfilteredPartitionIterator fullPartitionDelete(CFMetaData cfm, DecoratedKey dk, long timestamp, int nowInSec) { return new SingletonUnfilteredPartitionIterator(PartitionUpdate.fullPartitionDelete(cfm, dk, timestamp, nowInSec).unfilteredIterator(), false); @@ -909,4 +998,26 @@ public class DataResolverTest { return new SingletonUnfilteredPartitionIterator(update.unfilteredIterator(), false); } + + private UnfilteredPartitionIterator iter(DecoratedKey key, Unfiltered... unfiltereds) + { + SortedSet<Unfiltered> s = new TreeSet<>(cfm.comparator); + Collections.addAll(s, unfiltereds); + final Iterator<Unfiltered> iterator = s.iterator(); + + UnfilteredRowIterator rowIter = new AbstractUnfilteredRowIterator(cfm, + key, + DeletionTime.LIVE, + cfm.partitionColumns(), + Rows.EMPTY_STATIC_ROW, + false, + EncodingStats.NO_STATS) + { + protected Unfiltered computeNext() + { + return iterator.hasNext() ? iterator.next() : endOfData(); + } + }; + return new SingletonUnfilteredPartitionIterator(rowIter, false); + } }