Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 4bbd28a04 -> 19d26bcb8
  refs/heads/cassandra-2.2 9715fc09b -> 53b6116d5
  refs/heads/cassandra-3.0 1d05bdab2 -> cccaf7ca2
  refs/heads/cassandra-3.11 f57d12ee7 -> 656cca778
  refs/heads/trunk 59814db54 -> c09e298a4


Handle static and partition deletion properly on ThrottledUnfilteredIterator

Patch by Zhao Yang; Reviewed by Paulo Motta for CASSANDRA-14315


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5b9e9854
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5b9e9854
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5b9e9854

Branch: refs/heads/trunk
Commit: 5b9e985474e696a83d23e7cf4bedaf360cdb1eaf
Parents: 59814db
Author: Zhao Yang <zhaoyangsingap...@gmail.com>
Authored: Thu Mar 15 11:47:54 2018 +0800
Committer: Paulo Motta <pa...@apache.org>
Committed: Mon Mar 19 21:53:44 2018 -0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/rows/ThrottledUnfilteredIterator.java    | 20 ++++-
 .../rows/ThrottledUnfilteredIteratorTest.java   | 88 +++++++++++++++++++-
 3 files changed, 102 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9e9854/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fed0cd1..fbcc1bb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Handle static and partition deletion properly on 
ThrottledUnfilteredIterator (CASSANDRA-14315)
  * NodeTool clientstats should show SSL Cipher (CASSANDRA-14322)
  * Add ability to specify driver name and version (CASSANDRA-14275)
  * Abstract streaming for pluggable storage (CASSANDRA-14115)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9e9854/src/java/org/apache/cassandra/db/rows/ThrottledUnfilteredIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/rows/ThrottledUnfilteredIterator.java 
b/src/java/org/apache/cassandra/db/rows/ThrottledUnfilteredIterator.java
index dd33b1e..a2e8425 100644
--- a/src/java/org/apache/cassandra/db/rows/ThrottledUnfilteredIterator.java
+++ b/src/java/org/apache/cassandra/db/rows/ThrottledUnfilteredIterator.java
@@ -75,8 +75,14 @@ public class ThrottledUnfilteredIterator extends 
AbstractIterator<UnfilteredRowI
         while (throttledItr != null && throttledItr.hasNext())
             throttledItr.next();
 
+        // The original UnfilteredRowIterator may have only partition deletion 
or static column but without unfiltereds.
+        // Return the original UnfilteredRowIterator
         if (!origin.hasNext())
-            return endOfData();
+        {
+            if (throttledItr != null)
+                return endOfData();
+            return throttledItr = origin;
+        }
 
         throttledItr = new WrappingUnfilteredRowIterator(origin)
         {
@@ -212,11 +218,18 @@ public class ThrottledUnfilteredIterator extends 
AbstractIterator<UnfilteredRowI
     }
 
     /**
-     * Splits a {@link UnfilteredPartitionIterator} in {@link 
UnfilteredRowIterator} batches with size no higher
-     * than <b>maxBatchSize</b>
+     * Splits a {@link UnfilteredPartitionIterator} in {@link 
UnfilteredRowIterator} batches with size no higher than
+     * <b>maxBatchSize</b>
+     *
+     * @param partitionIterator
+     * @param maxBatchSize max number of unfiltereds in the 
UnfilteredRowIterator. if 0 is given, it means no throttle.
+     * @return
      */
     public static CloseableIterator<UnfilteredRowIterator> 
throttle(UnfilteredPartitionIterator partitionIterator, int maxBatchSize)
     {
+        if (maxBatchSize == 0) // opt out
+            return partitionIterator;
+
         return new AbstractIterator<UnfilteredRowIterator>()
         {
             ThrottledUnfilteredIterator current = null;
@@ -232,7 +245,6 @@ public class ThrottledUnfilteredIterator extends 
AbstractIterator<UnfilteredRowI
                 if (current == null && partitionIterator.hasNext())
                 {
                     current = new 
ThrottledUnfilteredIterator(partitionIterator.next(), maxBatchSize);
-                    assert current.hasNext() : "UnfilteredPartitionIterator 
should not contain empty partitions";
                 }
 
                 if (current != null && current.hasNext())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9e9854/test/unit/org/apache/cassandra/db/rows/ThrottledUnfilteredIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/rows/ThrottledUnfilteredIteratorTest.java 
b/test/unit/org/apache/cassandra/db/rows/ThrottledUnfilteredIteratorTest.java
index 2d2cce0..a530521 100644
--- 
a/test/unit/org/apache/cassandra/db/rows/ThrottledUnfilteredIteratorTest.java
+++ 
b/test/unit/org/apache/cassandra/db/rows/ThrottledUnfilteredIteratorTest.java
@@ -84,6 +84,7 @@ public class ThrottledUnfilteredIteratorTest extends CQLTester
     static final TableMetadata metadata;
     static final ColumnMetadata v1Metadata;
     static final ColumnMetadata v2Metadata;
+    static final ColumnMetadata staticMetadata;
 
     static
     {
@@ -93,9 +94,81 @@ public class ThrottledUnfilteredIteratorTest extends 
CQLTester
                                 .addClusteringColumn("ck2", Int32Type.instance)
                                 .addRegularColumn("v1", Int32Type.instance)
                                 .addRegularColumn("v2", Int32Type.instance)
+                                .addStaticColumn("s1", Int32Type.instance)
                                 .build();
         v1Metadata = 
metadata.regularAndStaticColumns().columns(false).getSimple(0);
         v2Metadata = 
metadata.regularAndStaticColumns().columns(false).getSimple(1);
+        staticMetadata = 
metadata.regularAndStaticColumns().columns(true).getSimple(0);
+    }
+
+    @Test
+    public void emptyPartitionDeletionTest() throws Throwable
+    {
+        // create cell tombstone, range tombstone, partition deletion
+        createTable("CREATE TABLE %s (pk int, ck1 int, ck2 int, v1 int, v2 
int, PRIMARY KEY (pk, ck1, ck2))");
+        // partition deletion
+        execute("DELETE FROM %s USING TIMESTAMP 160 WHERE pk=1");
+
+        // flush and generate 1 sstable
+        ColumnFamilyStore cfs = 
Keyspace.open(keyspace()).getColumnFamilyStore(currentTable());
+        cfs.forceBlockingFlush();
+        cfs.disableAutoCompaction();
+        cfs.forceMajorCompaction();
+
+        assertEquals(1, cfs.getLiveSSTables().size());
+        SSTableReader reader = cfs.getLiveSSTables().iterator().next();
+
+        try (ISSTableScanner scanner = reader.getScanner();
+                CloseableIterator<UnfilteredRowIterator> throttled = 
ThrottledUnfilteredIterator.throttle(scanner, 100))
+        {
+            assertTrue(throttled.hasNext());
+            UnfilteredRowIterator iterator = throttled.next();
+            assertFalse(throttled.hasNext());
+            assertFalse(iterator.hasNext());
+            
assertEquals(iterator.partitionLevelDeletion().markedForDeleteAt(), 160);
+        }
+
+        // test opt out
+        try (ISSTableScanner scanner = reader.getScanner();
+                CloseableIterator<UnfilteredRowIterator> throttled = 
ThrottledUnfilteredIterator.throttle(scanner, 0))
+        {
+            assertEquals(scanner, throttled);
+        }
+    }
+
+    @Test
+    public void emptyStaticTest() throws Throwable
+    {
+        // create cell tombstone, range tombstone, partition deletion
+        createTable("CREATE TABLE %s (pk int, ck1 int, ck2 int, v1 int, v2 int 
static, PRIMARY KEY (pk, ck1, ck2))");
+        // partition deletion
+        execute("UPDATE %s SET v2 = 160 WHERE pk = 1");
+
+        // flush and generate 1 sstable
+        ColumnFamilyStore cfs = 
Keyspace.open(keyspace()).getColumnFamilyStore(currentTable());
+        cfs.forceBlockingFlush();
+        cfs.disableAutoCompaction();
+        cfs.forceMajorCompaction();
+
+        assertEquals(1, cfs.getLiveSSTables().size());
+        SSTableReader reader = cfs.getLiveSSTables().iterator().next();
+
+        try (ISSTableScanner scanner = reader.getScanner();
+             CloseableIterator<UnfilteredRowIterator> throttled = 
ThrottledUnfilteredIterator.throttle(scanner, 100))
+        {
+            assertTrue(throttled.hasNext());
+            UnfilteredRowIterator iterator = throttled.next();
+            assertFalse(throttled.hasNext());
+            assertFalse(iterator.hasNext());
+            
assertEquals(Int32Type.instance.getSerializer().deserialize(iterator.staticRow().cells().iterator().next().value()),
 new Integer(160));
+        }
+
+        // test opt out
+        try (ISSTableScanner scanner = reader.getScanner();
+             CloseableIterator<UnfilteredRowIterator> throttled = 
ThrottledUnfilteredIterator.throttle(scanner, 0))
+        {
+            assertEquals(scanner, throttled);
+        }
     }
 
     @Test
@@ -296,7 +369,7 @@ public class ThrottledUnfilteredIteratorTest extends 
CQLTester
             origin = rows(metadata.regularAndStaticColumns(),
                           1,
                           new DeletionTime(0, 100),
-                          Rows.EMPTY_STATIC_ROW,
+                          createStaticRow(createCell(staticMetadata, 160)),
                           rows.toArray(new Row[0]));
             throttledIterator = new ThrottledUnfilteredIterator(origin, 
throttle);
 
@@ -343,7 +416,7 @@ public class ThrottledUnfilteredIteratorTest extends 
CQLTester
         {
             origin = partitions(metadata.regularAndStaticColumns(),
                                 new DeletionTime(0, 100),
-                                Rows.EMPTY_STATIC_ROW,
+                                createStaticRow(createCell(staticMetadata, 
160)),
                                 partitions);
             throttledIterator = ThrottledUnfilteredIterator.throttle(origin, 
throttle);
 
@@ -364,7 +437,7 @@ public class ThrottledUnfilteredIteratorTest extends 
CQLTester
                 UnfilteredRowIterator current = 
rows(metadata.regularAndStaticColumns(),
                                                      currentPartition,
                                                      new DeletionTime(0, 100),
-                                                     Rows.EMPTY_STATIC_ROW,
+                                                     
createStaticRow(createCell(staticMetadata, 160)),
                                                      
partitions.get(currentPartition).toArray(new Row[0]));
                 assertMetadata(current, splitted, currentSplit == 1);
                 // no op
@@ -508,6 +581,15 @@ public class ThrottledUnfilteredIteratorTest extends 
CQLTester
         return builder.build();
     }
 
+    private static Row createStaticRow(Cell... columns)
+    {
+        Row.Builder builder = new BTreeRow.Builder(true);
+        builder.newRow(Clustering.STATIC_CLUSTERING);
+        for (Cell cell : columns)
+            builder.addCell(cell);
+        return builder.build();
+    }
+
     private static Cell createCell(ColumnMetadata metadata, int v)
     {
         return createCell(metadata, v, 100L, BufferCell.NO_DELETION_TIME);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to