Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 4bbd28a04 -> 19d26bcb8 refs/heads/cassandra-2.2 9715fc09b -> 53b6116d5 refs/heads/cassandra-3.0 1d05bdab2 -> cccaf7ca2 refs/heads/cassandra-3.11 f57d12ee7 -> 656cca778 refs/heads/trunk 59814db54 -> c09e298a4
Handle static and partition deletion properly on ThrottledUnfilteredIterator Patch by Zhao Yang; Reviewed by Paulo Motta for CASSANDRA-14315 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5b9e9854 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5b9e9854 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5b9e9854 Branch: refs/heads/trunk Commit: 5b9e985474e696a83d23e7cf4bedaf360cdb1eaf Parents: 59814db Author: Zhao Yang <zhaoyangsingap...@gmail.com> Authored: Thu Mar 15 11:47:54 2018 +0800 Committer: Paulo Motta <pa...@apache.org> Committed: Mon Mar 19 21:53:44 2018 -0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/rows/ThrottledUnfilteredIterator.java | 20 ++++- .../rows/ThrottledUnfilteredIteratorTest.java | 88 +++++++++++++++++++- 3 files changed, 102 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9e9854/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index fed0cd1..fbcc1bb 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Handle static and partition deletion properly on ThrottledUnfilteredIterator (CASSANDRA-14315) * NodeTool clientstats should show SSL Cipher (CASSANDRA-14322) * Add ability to specify driver name and version (CASSANDRA-14275) * Abstract streaming for pluggable storage (CASSANDRA-14115) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9e9854/src/java/org/apache/cassandra/db/rows/ThrottledUnfilteredIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/ThrottledUnfilteredIterator.java b/src/java/org/apache/cassandra/db/rows/ThrottledUnfilteredIterator.java index dd33b1e..a2e8425 100644 --- a/src/java/org/apache/cassandra/db/rows/ThrottledUnfilteredIterator.java +++ b/src/java/org/apache/cassandra/db/rows/ThrottledUnfilteredIterator.java @@ -75,8 +75,14 @@ public class ThrottledUnfilteredIterator extends AbstractIterator<UnfilteredRowI while (throttledItr != null && throttledItr.hasNext()) throttledItr.next(); + // The original UnfilteredRowIterator may have only partition deletion or static column but without unfiltereds. + // Return the original UnfilteredRowIterator if (!origin.hasNext()) - return endOfData(); + { + if (throttledItr != null) + return endOfData(); + return throttledItr = origin; + } throttledItr = new WrappingUnfilteredRowIterator(origin) { @@ -212,11 +218,18 @@ public class ThrottledUnfilteredIterator extends AbstractIterator<UnfilteredRowI } /** - * Splits a {@link UnfilteredPartitionIterator} in {@link UnfilteredRowIterator} batches with size no higher - * than <b>maxBatchSize</b> + * Splits a {@link UnfilteredPartitionIterator} in {@link UnfilteredRowIterator} batches with size no higher than + * <b>maxBatchSize</b> + * + * @param partitionIterator + * @param maxBatchSize max number of unfiltereds in the UnfilteredRowIterator. if 0 is given, it means no throttle. + * @return */ public static CloseableIterator<UnfilteredRowIterator> throttle(UnfilteredPartitionIterator partitionIterator, int maxBatchSize) { + if (maxBatchSize == 0) // opt out + return partitionIterator; + return new AbstractIterator<UnfilteredRowIterator>() { ThrottledUnfilteredIterator current = null; @@ -232,7 +245,6 @@ public class ThrottledUnfilteredIterator extends AbstractIterator<UnfilteredRowI if (current == null && partitionIterator.hasNext()) { current = new ThrottledUnfilteredIterator(partitionIterator.next(), maxBatchSize); - assert current.hasNext() : "UnfilteredPartitionIterator should not contain empty partitions"; } if (current != null && current.hasNext()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9e9854/test/unit/org/apache/cassandra/db/rows/ThrottledUnfilteredIteratorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/rows/ThrottledUnfilteredIteratorTest.java b/test/unit/org/apache/cassandra/db/rows/ThrottledUnfilteredIteratorTest.java index 2d2cce0..a530521 100644 --- a/test/unit/org/apache/cassandra/db/rows/ThrottledUnfilteredIteratorTest.java +++ b/test/unit/org/apache/cassandra/db/rows/ThrottledUnfilteredIteratorTest.java @@ -84,6 +84,7 @@ public class ThrottledUnfilteredIteratorTest extends CQLTester static final TableMetadata metadata; static final ColumnMetadata v1Metadata; static final ColumnMetadata v2Metadata; + static final ColumnMetadata staticMetadata; static { @@ -93,9 +94,81 @@ public class ThrottledUnfilteredIteratorTest extends CQLTester .addClusteringColumn("ck2", Int32Type.instance) .addRegularColumn("v1", Int32Type.instance) .addRegularColumn("v2", Int32Type.instance) + .addStaticColumn("s1", Int32Type.instance) .build(); v1Metadata = metadata.regularAndStaticColumns().columns(false).getSimple(0); v2Metadata = metadata.regularAndStaticColumns().columns(false).getSimple(1); + staticMetadata = metadata.regularAndStaticColumns().columns(true).getSimple(0); + } + + @Test + public void emptyPartitionDeletionTest() throws Throwable + { + // create cell tombstone, range tombstone, partition deletion + createTable("CREATE TABLE %s (pk int, ck1 int, ck2 int, v1 int, v2 int, PRIMARY KEY (pk, ck1, ck2))"); + // partition deletion + execute("DELETE FROM %s USING TIMESTAMP 160 WHERE pk=1"); + + // flush and generate 1 sstable + ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(currentTable()); + cfs.forceBlockingFlush(); + cfs.disableAutoCompaction(); + cfs.forceMajorCompaction(); + + assertEquals(1, cfs.getLiveSSTables().size()); + SSTableReader reader = cfs.getLiveSSTables().iterator().next(); + + try (ISSTableScanner scanner = reader.getScanner(); + CloseableIterator<UnfilteredRowIterator> throttled = ThrottledUnfilteredIterator.throttle(scanner, 100)) + { + assertTrue(throttled.hasNext()); + UnfilteredRowIterator iterator = throttled.next(); + assertFalse(throttled.hasNext()); + assertFalse(iterator.hasNext()); + assertEquals(iterator.partitionLevelDeletion().markedForDeleteAt(), 160); + } + + // test opt out + try (ISSTableScanner scanner = reader.getScanner(); + CloseableIterator<UnfilteredRowIterator> throttled = ThrottledUnfilteredIterator.throttle(scanner, 0)) + { + assertEquals(scanner, throttled); + } + } + + @Test + public void emptyStaticTest() throws Throwable + { + // create cell tombstone, range tombstone, partition deletion + createTable("CREATE TABLE %s (pk int, ck1 int, ck2 int, v1 int, v2 int static, PRIMARY KEY (pk, ck1, ck2))"); + // partition deletion + execute("UPDATE %s SET v2 = 160 WHERE pk = 1"); + + // flush and generate 1 sstable + ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(currentTable()); + cfs.forceBlockingFlush(); + cfs.disableAutoCompaction(); + cfs.forceMajorCompaction(); + + assertEquals(1, cfs.getLiveSSTables().size()); + SSTableReader reader = cfs.getLiveSSTables().iterator().next(); + + try (ISSTableScanner scanner = reader.getScanner(); + CloseableIterator<UnfilteredRowIterator> throttled = ThrottledUnfilteredIterator.throttle(scanner, 100)) + { + assertTrue(throttled.hasNext()); + UnfilteredRowIterator iterator = throttled.next(); + assertFalse(throttled.hasNext()); + assertFalse(iterator.hasNext()); + assertEquals(Int32Type.instance.getSerializer().deserialize(iterator.staticRow().cells().iterator().next().value()), new Integer(160)); + } + + // test opt out + try (ISSTableScanner scanner = reader.getScanner(); + CloseableIterator<UnfilteredRowIterator> throttled = ThrottledUnfilteredIterator.throttle(scanner, 0)) + { + assertEquals(scanner, throttled); + } } @Test @@ -296,7 +369,7 @@ public class ThrottledUnfilteredIteratorTest extends CQLTester origin = rows(metadata.regularAndStaticColumns(), 1, new DeletionTime(0, 100), - Rows.EMPTY_STATIC_ROW, + createStaticRow(createCell(staticMetadata, 160)), rows.toArray(new Row[0])); throttledIterator = new ThrottledUnfilteredIterator(origin, throttle); @@ -343,7 +416,7 @@ public class ThrottledUnfilteredIteratorTest extends CQLTester { origin = partitions(metadata.regularAndStaticColumns(), new DeletionTime(0, 100), - Rows.EMPTY_STATIC_ROW, + createStaticRow(createCell(staticMetadata, 160)), partitions); throttledIterator = ThrottledUnfilteredIterator.throttle(origin, throttle); @@ -364,7 +437,7 @@ public class ThrottledUnfilteredIteratorTest extends CQLTester UnfilteredRowIterator current = rows(metadata.regularAndStaticColumns(), currentPartition, new DeletionTime(0, 100), - Rows.EMPTY_STATIC_ROW, + createStaticRow(createCell(staticMetadata, 160)), partitions.get(currentPartition).toArray(new Row[0])); assertMetadata(current, splitted, currentSplit == 1); // no op @@ -508,6 +581,15 @@ public class ThrottledUnfilteredIteratorTest extends CQLTester return builder.build(); } + private static Row createStaticRow(Cell... columns) + { + Row.Builder builder = new BTreeRow.Builder(true); + builder.newRow(Clustering.STATIC_CLUSTERING); + for (Cell cell : columns) + builder.addCell(cell); + return builder.build(); + } + private static Cell createCell(ColumnMetadata metadata, int v) { return createCell(metadata, v, 100L, BufferCell.NO_DELETION_TIME); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org