Repository: cassandra Updated Branches: refs/heads/trunk be2cf1afa -> 65000b3c2
Don't account for unconsumed data when checking if past sstable index patch by slebresne; reviewed by blambov for CASSANDRA-10590 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4a00438a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4a00438a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4a00438a Branch: refs/heads/trunk Commit: 4a00438a27cfa5bcd29cdcd9b30eb6d69e836a1f Parents: 242b973 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Wed Oct 28 12:21:36 2015 +0100 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Thu Nov 5 11:07:09 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/UnfilteredDeserializer.java | 45 +++++++++++++++++++- .../columniterator/AbstractSSTableIterator.java | 4 +- .../columniterator/SSTableReversedIterator.java | 17 ++------ 4 files changed, 51 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a00438a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5fdeae5..1ff2fdb 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0 + * Fix reading of legacy sstables (CASSANDRA-10590) * Use CQL type names in schema metadata tables (CASSANDRA-10365) * Guard batchlog replay against integer division by zero (CASSANDRA-9223) * Fix bug when adding a column to thrift with the same name than a primary key (CASSANDRA-10608) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a00438a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java index 52de159..66f6b71 100644 --- a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java +++ b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java @@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.FileDataInput; import org.apache.cassandra.net.MessagingService; /** @@ -107,6 +108,20 @@ public abstract class UnfilteredDeserializer */ public abstract void skipNext() throws IOException; + + /** + * For the legacy layout deserializer, we have to deal with the fact that a row can span multiple index blocks and that + * the call to hasNext() reads the next element upfront. We must take that into account when we check in AbstractSSTableIterator if + * we're past the end of an index block boundary as that check expect to account for only consumed data (that is, if hasNext has + * been called and made us cross an index boundary but neither readNext() or skipNext() as yet been called, we shouldn't consider + * the index block boundary crossed yet). + * + * TODO: we don't care about this for the current file format because a row can never span multiple index blocks (further, hasNext() + * only just basically read 2 bytes from disk in that case). So once we drop backward compatibility with pre-3.0 sstable, we should + * remove this. + */ + public abstract long bytesReadForUnconsumedData(); + private static class CurrentDeserializer extends UnfilteredDeserializer { private final ClusteringPrefix.Deserializer clusteringDeserializer; @@ -216,6 +231,13 @@ public abstract class UnfilteredDeserializer isReady = false; isDone = false; } + + public long bytesReadForUnconsumedData() + { + // In theory, hasNext() does consume 2-3 bytes, but we don't care about this for the current file format so returning + // 0 to mean "do nothing". + return 0; + } } public static class OldFormatDeserializer extends UnfilteredDeserializer @@ -233,8 +255,8 @@ public abstract class UnfilteredDeserializer // The Unfiltered as read from the old format input private final UnfilteredIterator iterator; - // Tracks which tombstone are opened at any given point of the deserialization. Note that this - // is directly populated by UnfilteredIterator. + // The position in the input after the last data consumption (readNext/skipNext). + private long lastConsumedPosition; private OldFormatDeserializer(CFMetaData metadata, DataInputPlus in, @@ -245,6 +267,7 @@ public abstract class UnfilteredDeserializer super(metadata, in, helper); this.iterator = new UnfilteredIterator(partitionDeletion); this.readAllAsDynamic = readAllAsDynamic; + this.lastConsumedPosition = currentPosition(); } public void setSkipStatic() @@ -322,12 +345,20 @@ public abstract class UnfilteredDeserializer return nextIsRow() && ((Row)next).isStatic(); } + private long currentPosition() + { + // We return a bogus value if the input is not file based, but check we never rely + // on that value in that case in bytesReadForUnconsumedData + return in instanceof FileDataInput ? ((FileDataInput)in).getFilePointer() : 0; + } + public Unfiltered readNext() throws IOException { if (!hasNext()) throw new IllegalStateException(); Unfiltered toReturn = next; next = null; + lastConsumedPosition = currentPosition(); return toReturn; } @@ -336,6 +367,15 @@ public abstract class UnfilteredDeserializer if (!hasNext()) throw new UnsupportedOperationException(); next = null; + lastConsumedPosition = currentPosition(); + } + + public long bytesReadForUnconsumedData() + { + if (!(in instanceof FileDataInput)) + throw new AssertionError(); + + return currentPosition() - lastConsumedPosition; } public void clearState() @@ -343,6 +383,7 @@ public abstract class UnfilteredDeserializer next = null; saved = null; iterator.clearState(); + lastConsumedPosition = currentPosition(); } // Groups atoms from the input into proper Unfiltered. http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a00438a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java index 8900b31..5f280d7 100644 --- a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java +++ b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java @@ -486,7 +486,9 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator // Check if we've crossed an index boundary (based on the mark on the beginning of the index block). public boolean isPastCurrentBlock() { - return reader.file.bytesPastMark(mark) >= currentIndex().width; + assert reader.deserializer != null; + long correction = reader.deserializer.bytesReadForUnconsumedData(); + return reader.file.bytesPastMark(mark) - correction >= currentIndex().width; } public int currentBlockIdx() http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a00438a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java index 01a8fb2..66c32ee 100644 --- a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java +++ b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java @@ -155,23 +155,17 @@ public class SSTableReversedIterator extends AbstractSSTableIterator buffer.reset(); boolean isFirst = true; - boolean isDone = false; // If the start might be in this block, skip everything that comes before it. if (start != null) { - while (!isDone && deserializer.hasNext() && deserializer.compareNextTo(start) <= 0) + while (deserializer.hasNext() && deserializer.compareNextTo(start) <= 0 && !stopReadingDisk()) { isFirst = false; if (deserializer.nextIsRow()) deserializer.skipNext(); else updateOpenMarker((RangeTombstoneMarker)deserializer.readNext()); - - // Note that because 'deserializer.hasNext()' may advance our file pointer, we need to always check stopReadingDisk() before any call to it, - // i.e. just after we've called readNext/skipNext - if (stopReadingDisk()) - isDone = true; } } @@ -183,17 +177,14 @@ public class SSTableReversedIterator extends AbstractSSTableIterator } // Now deserialize everything until we reach our requested end (if we have one) - while (!isDone - && deserializer.hasNext() - && (end == null || deserializer.compareNextTo(end) <= 0)) + while (deserializer.hasNext() + && (end == null || deserializer.compareNextTo(end) <= 0) + && !stopReadingDisk()) { Unfiltered unfiltered = deserializer.readNext(); if (!isFirst || includeFirst) buffer.add(unfiltered); - if (stopReadingDisk()) - isDone = true; - isFirst = false; if (unfiltered.isRangeTombstoneMarker())