Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 0e96d3e52 -> 9b48a0bf4
  refs/heads/trunk c83729f41 -> 0541597e7


Only open one sstable scanner per sstable

Patch by marcuse; reviewed by Paulo Motta for CASSANDRA-11412


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9b48a0bf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9b48a0bf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9b48a0bf

Branch: refs/heads/cassandra-3.0
Commit: 9b48a0bf430b995332e1a4dde20ba7482175ef99
Parents: 0e96d3e
Author: Marcus Eriksson <marc...@apache.org>
Authored: Thu Mar 31 16:32:11 2016 +0200
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Wed Apr 20 06:28:55 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../compaction/AbstractCompactionStrategy.java  | 11 ++++--
 .../compaction/CompactionStrategyManager.java   | 21 +++-------
 .../compaction/LeveledCompactionStrategy.java   | 41 ++++++++++++--------
 .../io/sstable/format/big/BigTableReader.java   |  5 ++-
 5 files changed, 43 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b48a0bf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6586299..cc50a23 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.6
+ * Only open one sstable scanner per sstable (CASSANDRA-11412)
  * Option to specify ProtocolVersion in cassandra-stress (CASSANDRA-11410)
  * ArithmeticException in avgFunctionForDecimal (CASSANDRA-11485)
  * LogAwareFileLister should only use OLD sstable files in current folder to 
determine disk consistency (CASSANDRA-11470)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b48a0bf/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index ae8839e..c205d5c 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -263,6 +263,11 @@ public abstract class AbstractCompactionStrategy
         });
     }
 
+
+    public ScannerList getScanners(Collection<SSTableReader> sstables, 
Range<Token> range)
+    {
+        return range == null ? getScanners(sstables, 
(Collection<Range<Token>>)null) : getScanners(sstables, 
Collections.singleton(range));
+    }
     /**
      * Returns a list of KeyScanners given sstables and a range on which to 
scan.
      * The default implementation simply grab one SSTableScanner per-sstable, 
but overriding this method
@@ -270,14 +275,14 @@ public abstract class AbstractCompactionStrategy
      * LeveledCompactionStrategy for instance).
      */
     @SuppressWarnings("resource")
-    public ScannerList getScanners(Collection<SSTableReader> sstables, 
Range<Token> range)
+    public ScannerList getScanners(Collection<SSTableReader> sstables, 
Collection<Range<Token>> ranges)
     {
         RateLimiter limiter = CompactionManager.instance.getRateLimiter();
         ArrayList<ISSTableScanner> scanners = new ArrayList<ISSTableScanner>();
         try
         {
             for (SSTableReader sstable : sstables)
-                scanners.add(sstable.getScanner(range, limiter));
+                scanners.add(sstable.getScanner(ranges, limiter));
         }
         catch (Throwable t)
         {
@@ -349,7 +354,7 @@ public abstract class AbstractCompactionStrategy
 
     public ScannerList getScanners(Collection<SSTableReader> toCompact)
     {
-        return getScanners(toCompact, null);
+        return getScanners(toCompact, (Collection<Range<Token>>)null);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b48a0bf/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index bd72c64..82fd872 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -353,7 +353,7 @@ public class CompactionStrategyManager implements 
INotificationConsumer
      *
      * Delegates the call to the compaction strategies to allow LCS to create 
a scanner
      * @param sstables
-     * @param range
+     * @param ranges
      * @return
      */
     @SuppressWarnings("resource")
@@ -370,25 +370,16 @@ public class CompactionStrategyManager implements 
INotificationConsumer
         }
 
         Set<ISSTableScanner> scanners = new HashSet<>(sstables.size());
-
-        for (Range<Token> range : ranges)
-        {
-            AbstractCompactionStrategy.ScannerList repairedScanners = 
repaired.getScanners(repairedSSTables, range);
-            AbstractCompactionStrategy.ScannerList unrepairedScanners = 
unrepaired.getScanners(unrepairedSSTables, range);
-
-            for (ISSTableScanner scanner : 
Iterables.concat(repairedScanners.scanners, unrepairedScanners.scanners))
-            {
-                if (!scanners.add(scanner))
-                    scanner.close();
-            }
-        }
-
+        AbstractCompactionStrategy.ScannerList repairedScanners = 
repaired.getScanners(repairedSSTables, ranges);
+        AbstractCompactionStrategy.ScannerList unrepairedScanners = 
unrepaired.getScanners(unrepairedSSTables, ranges);
+        scanners.addAll(repairedScanners.scanners);
+        scanners.addAll(unrepairedScanners.scanners);
         return new AbstractCompactionStrategy.ScannerList(new 
ArrayList<>(scanners));
     }
 
     public synchronized AbstractCompactionStrategy.ScannerList 
getScanners(Collection<SSTableReader> sstables)
     {
-        return getScanners(sstables, Collections.singleton(null));
+        return getScanners(sstables, null);
     }
 
     public Collection<Collection<SSTableReader>> 
groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b48a0bf/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 953971a..5a5cbef 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -216,7 +216,7 @@ public class LeveledCompactionStrategy extends 
AbstractCompactionStrategy
         return maxSSTableSizeInMB * 1024L * 1024L;
     }
 
-    public ScannerList getScanners(Collection<SSTableReader> sstables, 
Range<Token> range)
+    public ScannerList getScanners(Collection<SSTableReader> sstables, 
Collection<Range<Token>> ranges)
     {
         Multimap<Integer, SSTableReader> byLevel = ArrayListMultimap.create();
         for (SSTableReader sstable : sstables)
@@ -235,16 +235,16 @@ public class LeveledCompactionStrategy extends 
AbstractCompactionStrategy
                 {
                     // L0 makes no guarantees about overlapping-ness.  Just 
create a direct scanner for each
                     for (SSTableReader sstable : byLevel.get(level))
-                        scanners.add(sstable.getScanner(range, 
CompactionManager.instance.getRateLimiter()));
+                        scanners.add(sstable.getScanner(ranges, 
CompactionManager.instance.getRateLimiter()));
                 }
                 else
                 {
                     // Create a LeveledScanner that only opens one sstable at 
a time, in sorted order
-                    List<SSTableReader> intersecting = 
LeveledScanner.intersecting(byLevel.get(level), range);
+                    Collection<SSTableReader> intersecting = 
LeveledScanner.intersecting(byLevel.get(level), ranges);
                     if (!intersecting.isEmpty())
                     {
                         @SuppressWarnings("resource") // The ScannerList will 
be in charge of closing (and we close properly on errors)
-                        ISSTableScanner scanner = new 
LeveledScanner(intersecting, range);
+                        ISSTableScanner scanner = new 
LeveledScanner(intersecting, ranges);
                         scanners.add(scanner);
                     }
                 }
@@ -288,7 +288,7 @@ public class LeveledCompactionStrategy extends 
AbstractCompactionStrategy
     // same level (e.g. non overlapping) - see #4142
     private static class LeveledScanner extends 
AbstractIterator<UnfilteredRowIterator> implements ISSTableScanner
     {
-        private final Range<Token> range;
+        private final Collection<Range<Token>> ranges;
         private final List<SSTableReader> sstables;
         private final Iterator<SSTableReader> sstableIterator;
         private final long totalLength;
@@ -296,9 +296,9 @@ public class LeveledCompactionStrategy extends 
AbstractCompactionStrategy
         private ISSTableScanner currentScanner;
         private long positionOffset;
 
-        public LeveledScanner(Collection<SSTableReader> sstables, Range<Token> 
range)
+        public LeveledScanner(Collection<SSTableReader> sstables, 
Collection<Range<Token>> ranges)
         {
-            this.range = range;
+            this.ranges = ranges;
 
             // add only sstables that intersect our range, and estimate how 
much data that involves
             this.sstables = new ArrayList<>(sstables.size());
@@ -309,8 +309,8 @@ public class LeveledCompactionStrategy extends 
AbstractCompactionStrategy
                 long estimatedKeys = sstable.estimatedKeys();
                 double estKeysInRangeRatio = 1.0;
 
-                if (estimatedKeys > 0 && range != null)
-                    estKeysInRangeRatio = ((double) 
sstable.estimatedKeysForRanges(Collections.singleton(range))) / estimatedKeys;
+                if (estimatedKeys > 0 && ranges != null)
+                    estKeysInRangeRatio = ((double) 
sstable.estimatedKeysForRanges(ranges)) / estimatedKeys;
 
                 length += sstable.uncompressedLength() * estKeysInRangeRatio;
             }
@@ -319,21 +319,28 @@ public class LeveledCompactionStrategy extends 
AbstractCompactionStrategy
             Collections.sort(this.sstables, SSTableReader.sstableComparator);
             sstableIterator = this.sstables.iterator();
             assert sstableIterator.hasNext(); // caller should check 
intersecting first
-            currentScanner = sstableIterator.next().getScanner(range, 
CompactionManager.instance.getRateLimiter());
+            currentScanner = sstableIterator.next().getScanner(ranges, 
CompactionManager.instance.getRateLimiter());
         }
 
-        public static List<SSTableReader> 
intersecting(Collection<SSTableReader> sstables, Range<Token> range)
+        public static Collection<SSTableReader> 
intersecting(Collection<SSTableReader> sstables, Collection<Range<Token>> 
ranges)
         {
-            ArrayList<SSTableReader> filtered = new ArrayList<>();
-            for (SSTableReader sstable : sstables)
+            if (ranges == null)
+                return Lists.newArrayList(sstables);
+
+            Set<SSTableReader> filtered = new HashSet<>();
+            for (Range<Token> range : ranges)
             {
-                Range<Token> sstableRange = new 
Range<>(sstable.first.getToken(), sstable.last.getToken());
-                if (range == null || sstableRange.intersects(range))
-                    filtered.add(sstable);
+                for (SSTableReader sstable : sstables)
+                {
+                    Range<Token> sstableRange = new 
Range<>(sstable.first.getToken(), sstable.last.getToken());
+                    if (range == null || sstableRange.intersects(range))
+                        filtered.add(sstable);
+                }
             }
             return filtered;
         }
 
+
         public boolean isForThrift()
         {
             return false;
@@ -362,7 +369,7 @@ public class LeveledCompactionStrategy extends 
AbstractCompactionStrategy
                     currentScanner = null;
                     return endOfData();
                 }
-                currentScanner = sstableIterator.next().getScanner(range, 
CompactionManager.instance.getRateLimiter());
+                currentScanner = sstableIterator.next().getScanner(ranges, 
CompactionManager.instance.getRateLimiter());
             }
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b48a0bf/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
index dbab0f4..1fbf1f2 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
@@ -110,7 +110,10 @@ public class BigTableReader extends SSTableReader
      */
     public ISSTableScanner getScanner(Collection<Range<Token>> ranges, 
RateLimiter limiter)
     {
-        return BigTableScanner.getScanner(this, ranges, limiter);
+        if (ranges != null)
+            return BigTableScanner.getScanner(this, ranges, limiter);
+        else
+            return getScanner(limiter);
     }
 
 

Reply via email to