Legacy deserializer can create empty range tombstones patch by Sylvain Lebresne; reviewed by Branimir Lambov for CASSANDRA-13341
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/451fe9d8 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/451fe9d8 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/451fe9d8 Branch: refs/heads/trunk Commit: 451fe9d8ac567942f62852f542d28d7d1116f1a1 Parents: 6edc268 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Thu Mar 16 17:25:39 2017 +0100 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Wed Mar 29 13:17:58 2017 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/UnfilteredDeserializer.java | 124 ++++++++++++++----- .../db/rows/RangeTombstoneBoundMarker.java | 8 ++ .../db/rows/RangeTombstoneBoundaryMarker.java | 5 + .../cassandra/db/rows/RangeTombstoneMarker.java | 2 + .../cassandra/db/OldFormatDeserializerTest.java | 54 ++++++++ 6 files changed, 162 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/451fe9d8/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b46eb50..c4293de 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.13 + * Legacy deserializer can create empty range tombstones (CASSANDRA-13341) * Use the Kernel32 library to retrieve the PID on Windows and fix startup checks (CASSANDRA-13333) * Fix code to not exchange schema across major versions (CASSANDRA-13274) * Dropping column results in "corrupt" SSTable (CASSANDRA-13337) http://git-wip-us.apache.org/repos/asf/cassandra/blob/451fe9d8/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java index 7bbbfdb..92690e1 100644 --- a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java +++ b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java @@ -26,8 +26,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; import com.google.common.collect.PeekingIterator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.io.util.DataInputPlus; @@ -43,8 +41,6 @@ import org.apache.cassandra.net.MessagingService; */ public abstract class UnfilteredDeserializer { - private static final Logger logger = LoggerFactory.getLogger(UnfilteredDeserializer.class); - protected final CFMetaData metadata; protected final DataInputPlus in; protected final SerializationHelper helper; @@ -433,21 +429,31 @@ public abstract class UnfilteredDeserializer { if (atoms.hasNext()) { + // If there is a range tombstone to open strictly before the next row/RT, we need to return that open (or boundary) marker first. + if (tombstoneTracker.hasOpeningMarkerBefore(atoms.peek())) + { + next = tombstoneTracker.popOpeningMarker(); + } // If a range tombstone closes strictly before the next row/RT, we need to return that close (or boundary) marker first. - if (tombstoneTracker.hasClosingMarkerBefore(atoms.peek())) + else if (tombstoneTracker.hasClosingMarkerBefore(atoms.peek())) { next = tombstoneTracker.popClosingMarker(); } else { LegacyLayout.LegacyAtom atom = atoms.next(); - if (!tombstoneTracker.isShadowed(atom)) - next = isRow(atom) ? readRow(atom) : tombstoneTracker.openNew(atom.asRangeTombstone()); + if (tombstoneTracker.isShadowed(atom)) + continue; + + if (isRow(atom)) + next = readRow(atom); + else + tombstoneTracker.openNew(atom.asRangeTombstone()); } } else if (tombstoneTracker.hasOpenTombstones()) { - next = tombstoneTracker.popClosingMarker(); + next = tombstoneTracker.popMarker(); } else { @@ -562,11 +568,31 @@ public abstract class UnfilteredDeserializer /** * Tracks which range tombstones are open when deserializing the old format. + * <p> + * This is a bit tricky because in the old of format we could have duplicated tombstones, overlapping ones, + * shadowed ones, etc.., but we should generate from that a "flat" output where at most one non-shadoowed + * range is open at any given time and without empty range. + * <p> + * One consequence of that is that we have to be careful to not generate markers too soon. For instance, + * we might get a range tombstone [1, 1]@3 followed by [1, 10]@5. So if we generate an opening marker on + * the first tombstone (so INCL_START(1)@3), we're screwed when we get to the 2nd range tombstone: we really + * should ignore the first tombstone in that that and generate INCL_START(1)@5 (assuming obviously we don't + * have one more range tombstone starting at 1 in the stream). This is why we have the + * {@link #hasOpeningMarkerBefore} method: in practice, we remember when a marker should be opened, but only + * generate that opening marker when we're sure that we won't get anything shadowing that marker. + * <p> + * For closing marker, we also have a {@link #hasClosingMarkerBefore} because in the old format the closing + * markers comes with the opening one, but we should generate them "in order" in the new format. */ private class TombstoneTracker { private final DeletionTime partitionDeletion; + // As explained in the javadoc, we need to wait to generate an opening marker until we're sure we have + // seen anything that could shadow it. So this remember a marker that needs to be opened but hasn't + // been yet. This is truly returned when hasOpeningMarkerBefore tells us it's safe to. + private RangeTombstoneMarker openMarkerToReturn; + // Open tombstones sorted by their closing bound (i.e. first tombstone is the first to close). // As we only track non-fully-shadowed ranges, the first range is necessarily the currently // open tombstone (the one with the higher timestamp). @@ -594,6 +620,23 @@ public abstract class UnfilteredDeserializer } /** + * Whether there is an outstanding opening marker that should be returned before we process the provided row/RT. + */ + public boolean hasOpeningMarkerBefore(LegacyLayout.LegacyAtom atom) + { + return openMarkerToReturn != null + && metadata.comparator.compare(openMarkerToReturn.openBound(false), atom.clustering()) < 0; + } + + public Unfiltered popOpeningMarker() + { + assert openMarkerToReturn != null; + Unfiltered toReturn = openMarkerToReturn; + openMarkerToReturn = null; + return toReturn; + } + + /** * Whether the currently open marker closes stricly before the provided row/RT. */ public boolean hasClosingMarkerBefore(LegacyLayout.LegacyAtom atom) @@ -622,35 +665,56 @@ public abstract class UnfilteredDeserializer return RangeTombstoneBoundaryMarker.makeBoundary(false, first.stop.bound, first.stop.bound.invert(), first.deletionTime, next.deletionTime); } + /** + * Pop whatever next marker needs to be popped. This should be called as many time as necessary (until + * {@link #hasOpenTombstones} returns {@false}) when all atoms have been consumed to "empty" the tracker. + */ + public Unfiltered popMarker() + { + assert hasOpenTombstones(); + return openMarkerToReturn == null ? popClosingMarker() : popOpeningMarker(); + } + /** - * Update the tracker given the provided newly open tombstone. This return the Unfiltered corresponding to the opening - * of said tombstone: this can be a simple open mark, a boundary (if there was an open tombstone superseded by this new one) - * or even null (if the new tombstone start is supersedes by the currently open tombstone). + * Update the tracker given the provided newly open tombstone. This potentially update openMarkerToReturn + * to account for th new opening. * - * Note that this method assume the added tombstone is not fully shadowed, i.e. that !isShadowed(tombstone). It also - * assumes no opened tombstone closes before that tombstone (so !hasClosingMarkerBefore(tombstone)). + * Note that this method assumes that: + + 1) the added tombstone is not fully shadowed: !isShadowed(tombstone). + + 2) there is no marker to open that open strictly before this new tombstone: !hasOpeningMarkerBefore(tombstone). + + 3) no opened tombstone closes before that tombstone: !hasClosingMarkerBefore(tombstone). + + One can check that this is only called after the condition above have been checked in UnfilteredIterator.hasNext above. */ - public Unfiltered openNew(LegacyLayout.LegacyRangeTombstone tombstone) + public void openNew(LegacyLayout.LegacyRangeTombstone tombstone) { if (openTombstones.isEmpty()) { + // If we have an openMarkerToReturn, the corresponding RT must be in openTombstones (or we wouldn't know when to close it) + assert openMarkerToReturn == null; openTombstones.add(tombstone); - return new RangeTombstoneBoundMarker(tombstone.start.bound, tombstone.deletionTime); + openMarkerToReturn = new RangeTombstoneBoundMarker(tombstone.start.bound, tombstone.deletionTime); + return; } - // Add the new tombstone, and then check if it changes the currently open deletion or not. - // Note: we grab the first tombstone (which represents the currently open deletion time) before adding - // because add() can remove that first. - Iterator<LegacyLayout.LegacyRangeTombstone> iter = openTombstones.iterator(); - LegacyLayout.LegacyRangeTombstone first = iter.next(); + if (openMarkerToReturn != null) + { + // If the new opening supersedes the one we're about to return, we need to update the one to return. + if (tombstone.deletionTime.supersedes(openMarkerToReturn.openDeletionTime(false))) + openMarkerToReturn = openMarkerToReturn.withNewOpeningDeletionTime(false, tombstone.deletionTime); + } + else + { + // We have no openMarkerToReturn set yet so set it now if needs be. + // Since openTombstones isn't empty, it means we have a currently ongoing deletion. And if the new tombstone + // supersedes that ongoing deletion, we need to close the opening deletion and open with the new one. + DeletionTime currentOpenDeletion = openTombstones.first().deletionTime; + if (tombstone.deletionTime.supersedes(currentOpenDeletion)) + openMarkerToReturn = RangeTombstoneBoundaryMarker.makeBoundary(false, tombstone.start.bound.invert(), tombstone.start.bound, currentOpenDeletion, tombstone.deletionTime); + } + // In all cases, we know !isShadowed(tombstone) so we need to add the tombstone (note however that we may not have set openMarkerToReturn if the + // new tombstone doesn't supersedes the current deletion _but_ extend past the marker currently open) add(tombstone); - - // If the newly opened tombstone superseds the currently open one, we have to produce a boundary to change - // the currently open deletion time, otherwise we have nothing to do. - return tombstone.deletionTime.supersedes(first.deletionTime) - ? RangeTombstoneBoundaryMarker.makeBoundary(false, tombstone.start.bound.invert(), tombstone.start.bound, first.deletionTime, tombstone.deletionTime) - : null; } /** @@ -680,16 +744,12 @@ public abstract class UnfilteredDeserializer public boolean hasOpenTombstones() { - return !openTombstones.isEmpty(); - } - - private boolean formBoundary(LegacyLayout.LegacyRangeTombstone close, LegacyLayout.LegacyRangeTombstone open) - { - return metadata.comparator.compare(close.stop.bound, open.start.bound) == 0; + return openMarkerToReturn != null || !openTombstones.isEmpty(); } public void clearState() { + openMarkerToReturn = null; openTombstones.clear(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/451fe9d8/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java index b35033d..6f4afa5 100644 --- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java +++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java @@ -124,6 +124,14 @@ public class RangeTombstoneBoundMarker extends AbstractRangeTombstoneMarker return new RangeTombstoneBoundMarker(clustering().copy(allocator), deletion); } + public RangeTombstoneBoundMarker withNewOpeningDeletionTime(boolean reversed, DeletionTime newDeletionTime) + { + if (!isOpen(reversed)) + throw new IllegalStateException(); + + return new RangeTombstoneBoundMarker(clustering(), newDeletionTime); + } + public void digest(MessageDigest digest) { bound.digest(digest); http://git-wip-us.apache.org/repos/asf/cassandra/blob/451fe9d8/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java index 06fbf87..0683d76 100644 --- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java +++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java @@ -120,6 +120,11 @@ public class RangeTombstoneBoundaryMarker extends AbstractRangeTombstoneMarker return new RangeTombstoneBoundaryMarker(clustering().copy(allocator), endDeletion, startDeletion); } + public RangeTombstoneBoundaryMarker withNewOpeningDeletionTime(boolean reversed, DeletionTime newDeletionTime) + { + return new RangeTombstoneBoundaryMarker(clustering(), reversed ? newDeletionTime : endDeletion, reversed ? startDeletion : newDeletionTime); + } + public static RangeTombstoneBoundaryMarker makeBoundary(boolean reversed, Slice.Bound close, Slice.Bound open, DeletionTime closeDeletion, DeletionTime openDeletion) { assert RangeTombstone.Bound.Kind.compare(close.kind(), open.kind()) == 0 : "Both bound don't form a boundary"; http://git-wip-us.apache.org/repos/asf/cassandra/blob/451fe9d8/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java index c4c9f7f..dee7231 100644 --- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java +++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java @@ -55,6 +55,8 @@ public interface RangeTombstoneMarker extends Unfiltered return false; } + public RangeTombstoneMarker withNewOpeningDeletionTime(boolean reversed, DeletionTime newDeletionTime); + /** * Utility class to help merging range tombstone markers coming from multiple inputs (UnfilteredRowIterators). * <p> http://git-wip-us.apache.org/repos/asf/cassandra/blob/451fe9d8/test/unit/org/apache/cassandra/db/OldFormatDeserializerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/OldFormatDeserializerTest.java b/test/unit/org/apache/cassandra/db/OldFormatDeserializerTest.java index 1060569..886b191 100644 --- a/test/unit/org/apache/cassandra/db/OldFormatDeserializerTest.java +++ b/test/unit/org/apache/cassandra/db/OldFormatDeserializerTest.java @@ -77,6 +77,60 @@ public class OldFormatDeserializerTest assertFalse(iterator.hasNext()); } + @Test + public void testRangeTombstonesSameStart() throws Exception + { + CFMetaData metadata = CFMetaData.Builder.create("ks", "table") + .withPartitioner(Murmur3Partitioner.instance) + .addPartitionKey("k", Int32Type.instance) + .addClusteringColumn("v", Int32Type.instance) + .build(); + + // Multiple RT that have the same start (we _can_ get this in the legacy format!) + Supplier<LegacyLayout.LegacyAtom> atomSupplier = supplier(rt(1, 2, 3), + rt(1, 2, 5), + rt(1, 5, 4)); + + UnfilteredIterator iterator = new UnfilteredIterator(metadata, + DeletionTime.LIVE, + new SerializationHelper(metadata, MessagingService.current_version, SerializationHelper.Flag.LOCAL), + atomSupplier); + + // We should be entirely ignoring the first tombston (shadowed by 2nd one) so we should generate + // [1, 2]@5 (2, 5]@4 (but where both range actually form a boundary) + + assertTrue(iterator.hasNext()); + + Unfiltered first = iterator.next(); + System.out.println(">> " + first.toString(metadata)); + assertTrue(first.isRangeTombstoneMarker()); + RangeTombstoneMarker start = (RangeTombstoneMarker)first; + assertTrue(start.isOpen(false)); + assertFalse(start.isClose(false)); + assertEquals(1, toInt(start.openBound(false))); + assertEquals(5, start.openDeletionTime(false).markedForDeleteAt()); + + Unfiltered second = iterator.next(); + assertTrue(second.isRangeTombstoneMarker()); + RangeTombstoneMarker middle = (RangeTombstoneMarker)second; + assertTrue(middle.isClose(false)); + assertTrue(middle.isOpen(false)); + assertEquals(2, toInt(middle.closeBound(false))); + assertEquals(2, toInt(middle.openBound(false))); + assertEquals(5, middle.closeDeletionTime(false).markedForDeleteAt()); + assertEquals(4, middle.openDeletionTime(false).markedForDeleteAt()); + + Unfiltered third = iterator.next(); + assertTrue(third.isRangeTombstoneMarker()); + RangeTombstoneMarker end = (RangeTombstoneMarker)third; + assertTrue(end.isClose(false)); + assertFalse(end.isOpen(false)); + assertEquals(5, toInt(end.closeBound(false))); + assertEquals(4, end.closeDeletionTime(false).markedForDeleteAt()); + + assertFalse(iterator.hasNext()); + } + private static int toInt(ClusteringPrefix prefix) { assertTrue(prefix.size() == 1);