Repository: phoenix
Updated Branches:
  refs/heads/4.0 f7e6a6c7e -> 51f69bcb6


PHOENIX-1251 Salted queries with range scan become full table scans


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a5d07cc0
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a5d07cc0
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a5d07cc0

Branch: refs/heads/4.0
Commit: a5d07cc076763000f0c48c4c958e33aa578e85a9
Parents: 846ed10
Author: James Taylor <jtay...@salesforce.com>
Authored: Wed Oct 1 08:49:04 2014 -0700
Committer: James Taylor <jtay...@salesforce.com>
Committed: Wed Oct 1 08:49:04 2014 -0700

----------------------------------------------------------------------
 .../org/apache/phoenix/end2end/InListIT.java    |   2 +-
 .../org/apache/phoenix/compile/ScanRanges.java  | 239 ++++++++++--
 .../phoenix/compile/StatementContext.java       |  21 +-
 .../apache/phoenix/compile/WhereOptimizer.java  |  33 +-
 .../phoenix/index/PhoenixIndexBuilder.java      |   1 -
 .../phoenix/iterate/ParallelIterators.java      | 391 ++++++++++++++-----
 .../java/org/apache/phoenix/util/ScanUtil.java  |   4 +-
 7 files changed, 530 insertions(+), 161 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/a5d07cc0/phoenix-core/src/it/java/org/apache/phoenix/end2end/InListIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/InListIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/InListIT.java
index dc60b69..60bcb65 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/InListIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/InListIT.java
@@ -163,7 +163,7 @@ public class InListIT extends BaseHBaseManagedTimeIT {
     // the different combinations to check each test against
     private static final List<Boolean> TENANCIES = Arrays.asList(false, true);
     private static final List<PDataType> INTEGER_TYPES = 
Arrays.asList(PDataType.INTEGER, PDataType.LONG);
-    private static final List<Integer> SALT_BUCKET_NUMBERS = Arrays.asList(0, 
4);
+    private static final List<Integer> SALT_BUCKET_NUMBERS = 
Arrays.asList(/*0,*/ 4);
 
     // we should be including the RANGE_SCAN hint here, but a bug with 
ParallelIterators causes tests to fail
     // see the relevant JIRA here: 
https://issues.apache.org/jira/browse/PHOENIX-1251

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a5d07cc0/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
index dc8e0b3..1c739f3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
@@ -23,12 +23,17 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.KeyRange.Bound;
+import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.SaltingUtil;
+import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 
@@ -40,8 +45,8 @@ import com.google.common.collect.Lists;
 public class ScanRanges {
     private static final List<List<KeyRange>> EVERYTHING_RANGES = 
Collections.<List<KeyRange>>emptyList();
     private static final List<List<KeyRange>> NOTHING_RANGES = 
Collections.<List<KeyRange>>singletonList(Collections.<KeyRange>singletonList(KeyRange.EMPTY_RANGE));
-    public static final ScanRanges EVERYTHING = new 
ScanRanges(null,ScanUtil.SINGLE_COLUMN_SLOT_SPAN,EVERYTHING_RANGES, false, 
false);
-    public static final ScanRanges NOTHING = new 
ScanRanges(null,ScanUtil.SINGLE_COLUMN_SLOT_SPAN,NOTHING_RANGES, false, false);
+    public static final ScanRanges EVERYTHING = new 
ScanRanges(null,ScanUtil.SINGLE_COLUMN_SLOT_SPAN,EVERYTHING_RANGES, 
KeyRange.EVERYTHING_RANGE, false, false, null);
+    public static final ScanRanges NOTHING = new 
ScanRanges(null,ScanUtil.SINGLE_COLUMN_SLOT_SPAN,NOTHING_RANGES, 
KeyRange.EMPTY_RANGE, false, false, null);
 
     public static ScanRanges create(RowKeySchema schema, List<List<KeyRange>> 
ranges, int[] slotSpan) {
         return create(schema, ranges, slotSpan, false, null);
@@ -72,37 +77,210 @@ public class ScanRanges {
                 // when there's a single key.
                 slotSpan = new int[] {schema.getMaxFields()-1};
             }
-        } else if (nBuckets != null) {
+        } /*else if (nBuckets != null) {
             List<List<KeyRange>> saltedRanges = 
Lists.newArrayListWithExpectedSize(ranges.size());
             saltedRanges.add(SaltingUtil.generateAllSaltingRanges(nBuckets));
             saltedRanges.addAll(ranges.subList(1, ranges.size()));
             ranges = saltedRanges;
+        }*/
+        List<List<KeyRange>> sortedRanges = 
Lists.newArrayListWithExpectedSize(ranges.size());
+        for (int i = 0; i < ranges.size(); i++) {
+            List<KeyRange> sorted = Lists.newArrayList(ranges.get(i));
+            Collections.sort(sorted, KeyRange.COMPARATOR);
+            sortedRanges.add(ImmutableList.copyOf(sorted));
         }
-        return new ScanRanges(schema, slotSpan, ranges, forceRangeScan, 
isPointLookup);
+        // Don't set minMaxRange for point lookup because it causes issues 
during intersect
+        // by us ignoring the salt byte
+        KeyRange minMaxRange = isPointLookup ? KeyRange.EVERYTHING_RANGE : 
calculateMinMaxRange(schema, slotSpan, sortedRanges);
+        return new ScanRanges(schema, slotSpan, sortedRanges, minMaxRange, 
forceRangeScan, isPointLookup, nBuckets);
     }
 
+    private static KeyRange calculateMinMaxRange(RowKeySchema schema, int[] 
slotSpan, List<List<KeyRange>> ranges) {
+        byte[] minKey = ScanUtil.getMinKey(schema, ranges, slotSpan);
+        byte[] maxKey = ScanUtil.getMaxKey(schema, ranges, slotSpan);
+        return KeyRange.getKeyRange(minKey, maxKey);
+    }
+    
     private SkipScanFilter filter;
     private final List<List<KeyRange>> ranges;
     private final int[] slotSpan;
     private final RowKeySchema schema;
-    private final boolean forceRangeScan;
     private final boolean isPointLookup;
+    private final boolean isSalted;
+    private final boolean useSkipScanFilter;
+    private final KeyRange minMaxRange;
 
-    private ScanRanges (RowKeySchema schema, int[] slotSpan, 
List<List<KeyRange>> ranges, boolean forceRangeScan, boolean isPointLookup) {
+    private ScanRanges (RowKeySchema schema, int[] slotSpan, 
List<List<KeyRange>> ranges, KeyRange minMaxRange, boolean forceRangeScan, 
boolean isPointLookup, Integer bucketNum) {
         this.isPointLookup = isPointLookup;
-        List<List<KeyRange>> sortedRanges = 
Lists.newArrayListWithExpectedSize(ranges.size());
-        for (int i = 0; i < ranges.size(); i++) {
-            List<KeyRange> sorted = Lists.newArrayList(ranges.get(i));
-            Collections.sort(sorted, KeyRange.COMPARATOR);
-            sortedRanges.add(ImmutableList.copyOf(sorted));
+        this.isSalted = bucketNum != null;
+        this.minMaxRange = minMaxRange;
+        this.useSkipScanFilter = useSkipScanFilter(forceRangeScan, 
isPointLookup, ranges);
+        
+        // Only blow out the bucket values if we're using the skip scan. We 
need all the
+        // bucket values in this case because we use intersect against a key 
that may have
+        // any of the possible bucket values. Otherwise, we can pretty easily 
ignore the
+        // bucket values.
+        if (useSkipScanFilter && isSalted && !isPointLookup) {
+               ranges.set(0, SaltingUtil.generateAllSaltingRanges(bucketNum));
         }
-        this.ranges = ImmutableList.copyOf(sortedRanges);
+        this.ranges = ImmutableList.copyOf(ranges);
         this.slotSpan = slotSpan;
         this.schema = schema;
-        if (schema != null && !ranges.isEmpty()) {
+        if (schema != null && !ranges.isEmpty()) { // TODO: only create if 
useSkipScanFilter is true?
             this.filter = new SkipScanFilter(this.ranges, slotSpan, schema);
         }
-        this.forceRangeScan = forceRangeScan;
+    }
+    
+    private static byte[] prefixKey(byte[] key, int keyOffset, byte[] 
prefixKey, int prefixKeyOffset) {
+        if (key.length > 0) {
+            byte[] newKey = new byte[key.length + prefixKeyOffset];
+            int totalKeyOffset = keyOffset + prefixKeyOffset;
+            System.arraycopy(prefixKey, 0, newKey, 0, totalKeyOffset);
+            System.arraycopy(key, keyOffset, newKey, totalKeyOffset, 
key.length - keyOffset);
+            return newKey;
+        }
+        return key;
+    }
+    
+    private static byte[] replaceSaltByte(byte[] key, byte[] saltKey) {
+        if (key.length == 0) {
+            return key;
+        }
+        byte[] temp = new byte[key.length];
+        System.arraycopy(saltKey, 0, temp, 0, SaltingUtil.NUM_SALTING_BYTES);
+        System.arraycopy(key, SaltingUtil.NUM_SALTING_BYTES, temp, 
SaltingUtil.NUM_SALTING_BYTES, key.length - SaltingUtil.NUM_SALTING_BYTES);
+        return temp;
+    }
+    
+    private static byte[] stripLocalIndexPrefix(byte[] key, int keyOffset) {
+        if (key.length == 0) {
+            return key;
+        }
+        byte[] temp = new byte[key.length - keyOffset];
+        System.arraycopy(key, keyOffset, temp, 0, key.length - keyOffset);
+        return temp;
+    }
+
+    public Scan intersect(Scan scan, final byte[] originalStartKey, final 
byte[] originalStopKey, final int keyOffset) {
+        byte[] startKey = originalStartKey;
+        byte[] stopKey = originalStopKey;
+        boolean mayHaveRows = false;
+        final int scanKeyOffset = this.isSalted ? 
SaltingUtil.NUM_SALTING_BYTES : 0;
+        // Offset for startKey/stopKey. Either 1 for salted tables or the 
prefix length
+        // of the current region for local indexes.
+        final int totalKeyOffset = scanKeyOffset + keyOffset;
+        // In this case, we've crossed the "prefix" boundary and should 
consider everything after the startKey
+        // This prevents us from having to prefix the key prior to knowing 
whether or not there may be an
+        // intersection.
+        byte[] prefixBytes = ByteUtil.EMPTY_BYTE_ARRAY;
+        if (totalKeyOffset > 0) {
+            prefixBytes = startKey.length > 0 ? startKey : (this.isSalted ? 
QueryConstants.SEPARATOR_BYTE_ARRAY : stopKey);
+        }
+        if (stopKey.length < totalKeyOffset || Bytes.compareTo(prefixBytes, 0, 
totalKeyOffset, stopKey, 0, totalKeyOffset) != 0) {
+            stopKey = ByteUtil.EMPTY_BYTE_ARRAY;
+        }
+        assert (scanKeyOffset == 0 || keyOffset == 0);
+        int scanStartKeyOffset = scanKeyOffset;
+        byte[] scanStartKey = scan.getStartRow();
+        // Compare ignoring key prefix and salt byte
+        if (scanStartKey.length > 0) {
+            if (startKey.length > 0 && Bytes.compareTo(scanStartKey, 
scanKeyOffset, scanStartKey.length - scanKeyOffset, startKey, totalKeyOffset, 
startKey.length - totalKeyOffset) < 0) {
+                scanStartKey = startKey;
+                scanStartKeyOffset = totalKeyOffset;
+            }
+        } else {
+               scanStartKey = startKey;
+            scanStartKeyOffset = totalKeyOffset;
+            mayHaveRows = true;
+        }
+        int scanStopKeyOffset = scanKeyOffset;
+        byte[] scanStopKey = scan.getStopRow();
+        if (scanStopKey.length > 0) {
+            if (stopKey.length > 0 && Bytes.compareTo(scanStopKey, 
scanKeyOffset, scanStopKey.length - scanKeyOffset, stopKey, totalKeyOffset, 
stopKey.length - totalKeyOffset) > 0) {
+                scanStopKey = stopKey;
+                scanStopKeyOffset = totalKeyOffset;
+            }
+        } else {
+               scanStopKey = stopKey;
+            scanStopKeyOffset = totalKeyOffset;
+            mayHaveRows = true;
+        }
+        mayHaveRows = mayHaveRows || Bytes.compareTo(scanStartKey, 
scanStartKeyOffset, scanStartKey.length - scanStartKeyOffset, scanStopKey, 
scanStopKeyOffset, scanStopKey.length - scanStopKeyOffset) < 0;
+        
+        if (!mayHaveRows) {
+            return null;
+        }
+        if (originalStopKey.length != 0 && scanStopKey.length == 0) {
+            scanStopKey = originalStopKey;
+        }
+        Filter newFilter = scan.getFilter();
+        // If the scan is using skip scan filter, intersect and replace the 
filter.
+        if (this.useSkipScanFilter()) {
+            byte[] skipScanStartKey = scanStartKey;
+            byte[] skipScanStopKey = scanStopKey;
+            // If we have a keyOffset and we've used the startKey/stopKey that
+            // were passed in (which have the prefix) for the above range 
check,
+            // we need to remove the prefix before running our intersect 
method.
+            // TODO: we could use skipScanFilter.setOffset(keyOffset) if both
+            // the startKey and stopKey were used above *and* our intersect
+            // method honored the skipScanFilter.offset variable.
+            if (scanKeyOffset > 0) {
+                if (skipScanStartKey != originalStartKey) { // original 
already has correct salt byte
+                    skipScanStartKey = replaceSaltByte(skipScanStartKey, 
prefixBytes);
+                }
+                if (skipScanStopKey != originalStopKey) {
+                    skipScanStopKey = replaceSaltByte(skipScanStopKey, 
prefixBytes);
+                }
+            } else if (keyOffset > 0) {
+                if (skipScanStartKey == originalStartKey) {
+                    skipScanStartKey = stripLocalIndexPrefix(skipScanStartKey, 
keyOffset);
+                }
+                if (skipScanStopKey == originalStopKey) {
+                    skipScanStopKey = stripLocalIndexPrefix(skipScanStopKey, 
keyOffset);
+                }
+            }
+            Filter filter = scan.getFilter();
+            if (filter instanceof SkipScanFilter) {
+                SkipScanFilter oldSkipScanFilter = (SkipScanFilter)filter;
+                newFilter = oldSkipScanFilter.intersect(skipScanStartKey, 
skipScanStopKey);
+                if (newFilter == null) {
+                    return null;
+                }
+            } else if (filter instanceof FilterList) {
+                FilterList oldList = (FilterList)filter;
+                FilterList newList = new 
FilterList(FilterList.Operator.MUST_PASS_ALL);
+                newFilter = newList;
+                for (Filter f : oldList.getFilters()) {
+                    if (f instanceof SkipScanFilter) {
+                        SkipScanFilter newSkipScanFilter = 
((SkipScanFilter)f).intersect(skipScanStartKey, skipScanStopKey);
+                        if (newSkipScanFilter == null) {
+                            return null;
+                        }
+                        newList.addFilter(newSkipScanFilter);
+                    } else {
+                        newList.addFilter(f);
+                    }
+                }
+            }
+        }
+        Scan newScan = ScanUtil.newScan(scan);
+        newScan.setFilter(newFilter);
+        // If we have an offset (salted table or local index), we need to make 
sure to
+        // prefix our scan start/stop row by the prefix of the startKey or 
stopKey that
+        // were passed in. Our scan either doesn't have the prefix or has a 
placeholder
+        // for it.
+        if (totalKeyOffset > 0) {
+            if (scanStartKey != originalStartKey) {
+                scanStartKey = prefixKey(scanStartKey, scanKeyOffset, 
prefixBytes, keyOffset);
+            }
+            if (scanStopKey != originalStopKey) {
+                scanStopKey = prefixKey(scanStopKey, scanKeyOffset, 
prefixBytes, keyOffset);
+            }
+        }
+        newScan.setStartRow(scanStartKey);
+        newScan.setStopRow(scanStopKey);
+        
+        return newScan;
     }
 
     public SkipScanFilter getSkipScanFilter() {
@@ -132,11 +310,15 @@ public class ScanRanges {
      *    not the last key slot
      */
     public boolean useSkipScanFilter() {
+        return useSkipScanFilter;
+    }
+    
+    private static boolean useSkipScanFilter(boolean forceRangeScan, boolean 
isPointLookup, List<List<KeyRange>> ranges) {
         if (forceRangeScan) {
             return false;
         }
         if (isPointLookup) {
-            return getPointLookupCount() > 1;
+            return getPointLookupCount(isPointLookup, ranges) > 1;
         }
         boolean hasRangeKey = false, useSkipScan = false;
         for (List<KeyRange> orRanges : ranges) {
@@ -208,6 +390,10 @@ public class ScanRanges {
     }
     
     public int getPointLookupCount() {
+        return getPointLookupCount(isPointLookup, ranges);
+    }
+    
+    private static int getPointLookupCount(boolean isPointLookup, 
List<List<KeyRange>> ranges) {
         return isPointLookup ? ranges.get(0).size() : 0;
     }
     
@@ -215,27 +401,10 @@ public class ScanRanges {
         return isPointLookup ? ranges.get(0).iterator() : 
Iterators.<KeyRange>emptyIterator();
     }
 
-    public void setScanStartStopRow(Scan scan) {
-        if (isEverything()) {
-            return;
-        }
-        if (isDegenerate()) {
-            scan.setStartRow(KeyRange.EMPTY_RANGE.getLowerRange());
-            scan.setStopRow(KeyRange.EMPTY_RANGE.getUpperRange());
-            return;
-        }
-        
-        byte[] expectedKey;
-        expectedKey = ScanUtil.getMinKey(schema, ranges, slotSpan);
-        if (expectedKey != null) {
-            scan.setStartRow(expectedKey);
-        }
-        expectedKey = ScanUtil.getMaxKey(schema, ranges, slotSpan);
-        if (expectedKey != null) {
-            scan.setStopRow(expectedKey);
-        }
+    public KeyRange getMinMaxRange() {
+        return minMaxRange;
     }
-
+    
     public static final ImmutableBytesWritable UNBOUND = new 
ImmutableBytesWritable(KeyRange.UNBOUND);
 
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a5d07cc0/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
index 56e63ae..90264bb 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -192,14 +192,14 @@ public class StatementContext {
     }
     
     public void setScanRanges(ScanRanges scanRanges) {
-        setScanRanges(scanRanges, null);
+        setScanRanges(scanRanges, KeyRange.EVERYTHING_RANGE);
     }
 
     public void setScanRanges(ScanRanges scanRanges, KeyRange minMaxRange) {
         this.scanRanges = scanRanges;
-        this.scanRanges.setScanStartStopRow(scan);
-        PTable table = this.getCurrentTable().getTable();
-        if (minMaxRange != null) {
+        KeyRange scanRange = scanRanges.getMinMaxRange();
+        if (minMaxRange != KeyRange.EVERYTHING_RANGE) {
+            PTable table = this.getCurrentTable().getTable();
             // Ensure minMaxRange is lower inclusive and upper exclusive, as 
that's
             // what we need to intersect against for the HBase scan.
             byte[] lowerRange = minMaxRange.getLowerRange();
@@ -216,17 +216,12 @@ public class StatementContext {
                 }
             }
             if (minMaxRange.getLowerRange() != lowerRange || 
minMaxRange.getUpperRange() != upperRange) {
-                minMaxRange = KeyRange.getKeyRange(lowerRange, true, 
upperRange, false);
-            }
-            // If we're not salting, we can intersect this now with the scan 
range.
-            // Otherwise, we have to wait to do this when we chunk up the scan.
-            if (table.getBucketNum() == null) {
-                minMaxRange = 
minMaxRange.intersect(KeyRange.getKeyRange(scan.getStartRow(), 
scan.getStopRow()));
-                scan.setStartRow(minMaxRange.getLowerRange());
-                scan.setStopRow(minMaxRange.getUpperRange());
+                minMaxRange = KeyRange.getKeyRange(lowerRange, upperRange);
             }
-            this.minMaxRange = minMaxRange;
+            scanRange = scanRange.intersect(minMaxRange);
         }
+        scan.setStartRow(scanRange.getLowerRange());
+        scan.setStopRow(scanRange.getUpperRange());
     }
     
     public PhoenixConnection getConnection() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a5d07cc0/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
index 51da924..23abf06 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
@@ -58,6 +58,7 @@ import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.RowKeySchema;
+import org.apache.phoenix.schema.SaltingUtil;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.ByteUtil;
@@ -146,14 +147,30 @@ public class WhereOptimizer {
         RowKeySchema schema = table.getRowKeySchema();
         List<List<KeyRange>> cnf = 
Lists.newArrayListWithExpectedSize(schema.getMaxFields());
         KeyRange minMaxRange = keySlots.getMinMaxRange();
-        boolean hasMinMaxRange = (minMaxRange != null);
+        if (minMaxRange == null) {
+            minMaxRange = KeyRange.EVERYTHING_RANGE;
+        }
+        boolean hasMinMaxRange = (minMaxRange != KeyRange.EVERYTHING_RANGE);
         int minMaxRangeOffset = 0;
         byte[] minMaxRangePrefix = null;
+        boolean isSalted = nBuckets != null;
+        boolean isMultiTenant = tenantId != null && table.isMultiTenant();
+        boolean hasViewIndex = table.getViewIndexId() != null;
+        if (hasMinMaxRange) {
+            int minMaxRangeSize = (isSalted ? SaltingUtil.NUM_SALTING_BYTES : 
0)
+                    + (isMultiTenant ? tenantId.getBytes().length + 1 : 0) 
+                    + (hasViewIndex ? 
MetaDataUtil.getViewIndexIdDataType().getByteSize() : 0);
+            minMaxRangePrefix = new byte[minMaxRangeSize];
+        }
         
         Iterator<KeyExpressionVisitor.KeySlot> iterator = keySlots.iterator();
         // Add placeholder for salt byte ranges
-        if (nBuckets != null) {
+        if (isSalted) {
             cnf.add(SALT_PLACEHOLDER);
+            if (hasMinMaxRange) {
+                   System.arraycopy(SALT_PLACEHOLDER.get(0).getLowerRange(), 
0, minMaxRangePrefix, minMaxRangeOffset, SaltingUtil.NUM_SALTING_BYTES);
+                   minMaxRangeOffset += SaltingUtil.NUM_SALTING_BYTES;
+            }
             // Increment the pkPos, as the salt column is in the row schema
             // Do not increment the iterator, though, as there will never be
             // an expression in the keySlots for the salt column
@@ -161,13 +178,12 @@ public class WhereOptimizer {
         }
         
         // Add tenant data isolation for tenant-specific tables
-        if (tenantId != null && table.isMultiTenant()) {
+        if (isMultiTenant) {
             byte[] tenantIdBytes = tenantId.getBytes();
             KeyRange tenantIdKeyRange = KeyRange.getKeyRange(tenantIdBytes);
             cnf.add(singletonList(tenantIdKeyRange));
             if (hasMinMaxRange) {
-                minMaxRangePrefix = new byte[tenantIdBytes.length + 
MetaDataUtil.getViewIndexIdDataType().getByteSize() + 1];
-                System.arraycopy(tenantIdBytes, 0, minMaxRangePrefix, 0, 
tenantIdBytes.length);
+                System.arraycopy(tenantIdBytes, 0, minMaxRangePrefix, 
minMaxRangeOffset, tenantIdBytes.length);
                 minMaxRangeOffset += tenantIdBytes.length;
                 if (!schema.getField(pkPos).getDataType().isFixedWidth()) {
                     minMaxRangePrefix[minMaxRangeOffset] = 
QueryConstants.SEPARATOR_BYTE;
@@ -178,14 +194,11 @@ public class WhereOptimizer {
         }
         // Add unique index ID for shared indexes on views. This ensures
         // that different indexes don't interleave.
-        if (table.getViewIndexId() != null) {
+        if (hasViewIndex) {
             byte[] viewIndexBytes = 
MetaDataUtil.getViewIndexIdDataType().toBytes(table.getViewIndexId());
             KeyRange indexIdKeyRange = KeyRange.getKeyRange(viewIndexBytes);
             cnf.add(singletonList(indexIdKeyRange));
             if (hasMinMaxRange) {
-                if (minMaxRangePrefix == null) {
-                    minMaxRangePrefix = new byte[viewIndexBytes.length];
-                }
                 System.arraycopy(viewIndexBytes, 0, minMaxRangePrefix, 
minMaxRangeOffset, viewIndexBytes.length);
                 minMaxRangeOffset += viewIndexBytes.length;
             }
@@ -194,7 +207,7 @@ public class WhereOptimizer {
         
         // Prepend minMaxRange with fixed column values so we can properly 
intersect the
         // range with the other range.
-        if (minMaxRange != null) {
+        if (hasMinMaxRange) {
             minMaxRange = minMaxRange.prependRange(minMaxRangePrefix, 0, 
minMaxRangeOffset);
         }
         boolean forcedSkipScan = statement.getHint().hasHint(Hint.SKIP_SCAN);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a5d07cc0/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
index de5a9cc..6897106 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
@@ -71,7 +71,6 @@ public class PhoenixIndexBuilder extends 
CoveredColumnsIndexBuilder {
         if (maintainers.isEmpty()) return;
         Scan scan = IndexManagementUtil.newLocalStateScan(new 
ArrayList<IndexMaintainer>(maintainers.values()));
         ScanRanges scanRanges = 
ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, 
Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN);
-        scanRanges.setScanStartStopRow(scan);
         scan.setFilter(scanRanges.getSkipScanFilter());
         HRegion region = this.env.getRegion();
         RegionScanner scanner = region.getScanner(scan);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a5d07cc0/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index a2dabe3..4145def 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -17,12 +17,9 @@
  */
 package org.apache.phoenix.iterate;
 
-import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY;
-
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -36,6 +33,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
@@ -44,6 +42,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.filter.ColumnProjectionFilter;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -55,15 +54,15 @@ import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
 import org.apache.phoenix.schema.PColumnFamily;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTable.ViewType;
-import org.apache.phoenix.schema.PTableType;
-import org.apache.phoenix.schema.SaltingUtil;
 import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.trace.util.Tracing;
+import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SQLCloseables;
@@ -74,6 +73,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
 
@@ -87,6 +87,7 @@ import com.google.common.collect.Lists;
  */
 public class ParallelIterators extends ExplainTable implements ResultIterators 
{
        private static final Logger logger = 
LoggerFactory.getLogger(ParallelIterators.class);
+    private final List<List<Scan>> scans;
     private final List<KeyRange> splits;
     private final ParallelIteratorFactory iteratorFactory;
     
@@ -95,6 +96,7 @@ public class ParallelIterators extends ExplainTable 
implements ResultIterators {
     }
 
     private static final int DEFAULT_THREAD_TIMEOUT_MS = 60000; // 1min
+    private static final int ESTIMATED_GUIDEPOSTS_PER_REGION = 20;
 
     static final Function<HRegionLocation, KeyRange> TO_KEY_RANGE = new 
Function<HRegionLocation, KeyRange>() {
         @Override
@@ -107,10 +109,8 @@ public class ParallelIterators extends ExplainTable 
implements ResultIterators {
             RowProjector projector, GroupBy groupBy, Integer limit, 
ParallelIteratorFactory iteratorFactory)
             throws SQLException {
         super(context, tableRef, groupBy);
-        this.splits = getSplits(context, tableRef, statement.getHint());
-        this.iteratorFactory = iteratorFactory;
-        Scan scan = context.getScan();
         PTable table = tableRef.getTable();
+        Scan scan = context.getScan();
         if (projector.isProjectEmptyKeyValue()) {
             Map<byte [], NavigableSet<byte []>> familyMap = 
scan.getFamilyMap();
             // If nothing projected into scan and we only have one column 
family, just allow everything
@@ -144,6 +144,18 @@ public class ParallelIterators extends ExplainTable 
implements ResultIterators {
         }
 
         doColumnProjectionOptimization(context, scan, table, statement);
+        
+        this.iteratorFactory = iteratorFactory;
+        // TODO: get physicalTable here if we don't have it
+        PTable physicalTable = table;
+        this.scans = getParallelScans(physicalTable);
+        List<KeyRange> splitRanges = 
Lists.newArrayListWithExpectedSize(scans.size() * 
ESTIMATED_GUIDEPOSTS_PER_REGION);
+        for (List<Scan> scanList : scans) {
+            for (Scan aScan : scanList) {
+                splitRanges.add(KeyRange.getKeyRange(aScan.getStartRow(), 
aScan.getStopRow()));
+            }
+        }
+        this.splits = ImmutableList.copyOf(splitRanges);
     }
 
     private void doColumnProjectionOptimization(StatementContext context, Scan 
scan, PTable table, FilterableStatement statement) {
@@ -249,6 +261,169 @@ public class ParallelIterators extends ExplainTable 
implements ResultIterators {
         return splits;
     }
 
+    private static List<byte[]> toBoundaries(List<HRegionLocation> 
regionLocations) {
+        int nBoundaries = regionLocations.size() - 1;
+        List<byte[]> ranges = Lists.newArrayListWithExpectedSize(nBoundaries);
+        for (int i = 0; i < nBoundaries; i++) {
+            HRegionInfo regionInfo = regionLocations.get(i).getRegionInfo();
+            ranges.add(regionInfo.getEndKey());
+        }
+        return ranges;
+    }
+    
+    private static int getIndexContainingInclusive(List<byte[]> boundaries, 
byte[] inclusiveKey) {
+        int guideIndex = Collections.binarySearch(boundaries, inclusiveKey, 
Bytes.BYTES_COMPARATOR);
+        // If we found an exact match, return the index+1, as the inclusiveKey 
will be contained
+        // in the next region (since we're matching on the end boundary).
+        guideIndex = (guideIndex < 0 ? -(guideIndex + 1) : (guideIndex + 1));
+        return guideIndex;
+    }
+    
+    private static int getIndexContainingExclusive(List<byte[]> boundaries, 
byte[] exclusiveKey) {
+        int guideIndex = Collections.binarySearch(boundaries, exclusiveKey, 
Bytes.BYTES_COMPARATOR);
+        // If we found an exact match, return the index we found as the 
exclusiveKey won't be
+        // contained in the next region as with getIndexContainingInclusive.
+        guideIndex = (guideIndex < 0 ? -(guideIndex + 1) : guideIndex);
+        return guideIndex;
+    }
+    
+    private List<byte[]> getGuidePosts(PTable table) {
+        Scan scan = context.getScan();
+        boolean isPointLookup = context.getScanRanges().isPointLookup();
+        byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(table);
+        List<byte[]> gps = Collections.emptyList();
+        /*
+         *  Don't use guide posts if:
+         *  1) We're doing a point lookup, as HBase is fast enough at those
+         *     to not need them to be further parallelized. TODO: pref test to 
verify
+         *  2) We're collecting stats, as in this case we need to scan entire
+         *     regions worth of data to track where to put the guide posts.
+         */
+        if (!isPointLookup && !ScanUtil.isAnalyzeTable(scan)) {
+            if (table.getColumnFamilies().isEmpty()) {
+                // For sure we can get the defaultCF from the table
+                return table.getGuidePosts();
+            }
+            try {
+                if (scan.getFamilyMap().size() > 0 && 
!scan.getFamilyMap().containsKey(defaultCF)) {
+                    // If default CF is not used in scan, use first CF 
referenced in scan
+                    return 
table.getColumnFamily(scan.getFamilyMap().keySet().iterator().next()).getGuidePosts();
+                }
+                // Otherwise, favor use of default CF.
+                return table.getColumnFamily(defaultCF).getGuidePosts();
+            } catch (ColumnFamilyNotFoundException cfne) {
+                // Alter table does this
+            }
+        }
+        return gps;
+        
+    }
+    
+    /**
+     * Compute the list of parallel scans to run for a given query. The inner 
scans
+     * may be concatenated together directly, while the other ones may need to 
be
+     * merge sorted, depending on the query.
+     * @param physicalTable
+     * @return list of parallel scans to run for a given query.
+     * @throws SQLException
+     */
+    private List<List<Scan>> getParallelScans(PTable physicalTable) throws 
SQLException {
+        List<HRegionLocation> regionLocations = 
context.getConnection().getQueryServices()
+                
.getAllTableRegions(physicalTable.getPhysicalName().getBytes());
+        List<byte[]> regionBoundaries = toBoundaries(regionLocations);
+        final Scan scan = context.getScan();
+        ScanRanges scanRanges = context.getScanRanges();
+        boolean isSalted = physicalTable.getBucketNum() != null;
+        boolean isLocalIndex = physicalTable.getIndexType() == IndexType.LOCAL;
+        List<byte[]> gps = getGuidePosts(physicalTable);
+        boolean traverseAllRegions = isSalted || isLocalIndex;
+        
+        byte[] startKey = ByteUtil.EMPTY_BYTE_ARRAY;
+        byte[] currentKey = ByteUtil.EMPTY_BYTE_ARRAY;
+        byte[] stopKey = ByteUtil.EMPTY_BYTE_ARRAY;
+        int regionIndex = 0;
+        int stopIndex = regionBoundaries.size();
+        if (!traverseAllRegions) {
+            startKey = scan.getStartRow();
+            if (startKey.length > 0) {
+                currentKey = startKey;
+                regionIndex = getIndexContainingInclusive(regionBoundaries, 
startKey);
+            }
+            stopKey = scan.getStopRow();
+            if (stopKey.length > 0) {
+                stopIndex = Math.min(stopIndex, 
getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), 
stopKey));
+            }
+        }
+        List<List<Scan>> parallelScans = 
Lists.newArrayListWithExpectedSize(stopIndex - regionIndex + 1);
+        
+        int guideIndex = currentKey.length == 0 ? 0 : 
getIndexContainingInclusive(gps, currentKey);
+        int gpsSize = gps.size();
+        int estGuidepostsPerRegion = gpsSize == 0 ? 1 : gpsSize / 
regionLocations.size() + 1;
+        int keyOffset = 0;
+        // Merge bisect with guideposts for all but the last region
+        while (regionIndex <= stopIndex) {
+            byte[] currentGuidePost;
+            byte[] endKey = regionIndex == stopIndex ? stopKey : 
regionBoundaries.get(regionIndex);
+            if (isLocalIndex) {
+                HRegionInfo regionInfo = 
regionLocations.get(regionIndex).getRegionInfo();
+                keyOffset = regionInfo.getStartKey().length > 0 ? 
regionInfo.getStartKey().length : regionInfo.getEndKey().length;
+            }
+            List<Scan> scans = 
Lists.newArrayListWithExpectedSize(estGuidepostsPerRegion);
+            while (guideIndex < gpsSize
+                    && (Bytes.compareTo(currentGuidePost = 
gps.get(guideIndex), endKey) <= 0 || endKey.length == 0)) {
+                Scan newScan = scanRanges.intersect(scan, currentKey, 
currentGuidePost, keyOffset);
+                if (newScan != null) {
+                    scans.add(newScan);
+                }
+                currentKey = currentGuidePost;
+                guideIndex++;
+            }
+            Scan newScan = scanRanges.intersect(scan, currentKey, endKey, 
keyOffset);
+            if (newScan != null) {
+                scans.add(newScan);
+            }
+            if (!scans.isEmpty()) {
+                parallelScans.add(scans);
+            }
+            currentKey = endKey;
+            regionIndex++;
+        }
+        if (logger.isDebugEnabled()) {
+            logger.debug(LogUtil.addCustomAnnotations("The parallelScans: " + 
parallelScans,
+                    ScanUtil.getCustomAnnotations(scan)));
+        }
+        return parallelScans;
+    }
+
+    private static void addConcatResultIterator(List<PeekingResultIterator> 
iterators, final List<PeekingResultIterator> concatIterators) {
+        if (!concatIterators.isEmpty()) {
+            if (concatIterators.size() == 1) {
+                iterators.add(concatIterators.get(0));
+            } else {
+                // TODO: should ConcatResultIterator have a constructor that 
takes
+                // a List<PeekingResultIterator>?
+                iterators.add(new ConcatResultIterator(new ResultIterators() {
+    
+                    @Override
+                    public List<PeekingResultIterator> getIterators() throws 
SQLException {
+                        return concatIterators;
+                    }
+    
+                    @Override
+                    public int size() {
+                        return concatIterators.size();
+                    }
+    
+                    @Override
+                    public void explain(List<String> planSteps) {
+                        // TODO: review what we should for explain plan here
+                        concatIterators.get(0).explain(planSteps);
+                    }
+                    
+                }));
+            }
+        }
+    }
     /**
      * Executes the scan in parallel across all regions, blocking until all 
scans are complete.
      * @return the result iterators for the scan of each region
@@ -258,51 +433,60 @@ public class ParallelIterators extends ExplainTable 
implements ResultIterators {
         boolean success = false;
         final ConnectionQueryServices services = 
context.getConnection().getQueryServices();
         ReadOnlyProps props = services.getProps();
-        int numSplits = splits.size();
+        int numSplits = size();
         List<PeekingResultIterator> iterators = new 
ArrayList<PeekingResultIterator>(numSplits);
-        List<Pair<KeyRange,Future<PeekingResultIterator>>> futures = new 
ArrayList<Pair<KeyRange,Future<PeekingResultIterator>>>(numSplits);
+        List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = 
Lists.newArrayListWithExpectedSize(numSplits);
         final UUID scanId = UUID.randomUUID();
         try {
-            submitWork(scanId, splits, futures);
+            submitWork(scanId, scans, futures);
             int timeoutMs = 
props.getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS);
-            final int factor = ScanUtil.isReversed(this.context.getScan()) ? 
-1 : 1;
-            // Sort futures by row key so that we have a predictable order 
we're getting rows back for scans.
-            // We're going to wait here until they're finished anyway and this 
makes testing much easier.
-            Collections.sort(futures, new 
Comparator<Pair<KeyRange,Future<PeekingResultIterator>>>() {
-                @Override
-                public int compare(Pair<KeyRange, 
Future<PeekingResultIterator>> o1, Pair<KeyRange, 
Future<PeekingResultIterator>> o2) {
-                    return factor * 
Bytes.compareTo(o1.getFirst().getLowerRange(), o2.getFirst().getLowerRange());
-                }
-            });
             boolean clearedCache = false;
             byte[] tableName = 
tableRef.getTable().getPhysicalName().getBytes();
-            for (Pair<KeyRange,Future<PeekingResultIterator>> future : 
futures) {
-                try {
-                    PeekingResultIterator iterator = 
future.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
-                    iterators.add(iterator);
-                } catch (ExecutionException e) {
-                    try { // Rethrow as SQLException
-                        throw ServerUtil.parseServerException(e);
-                    } catch (StaleRegionBoundaryCacheException e2) { // Catch 
only to try to recover from region boundary cache being out of date
-                        List<Pair<KeyRange,Future<PeekingResultIterator>>> 
newFutures = new ArrayList<Pair<KeyRange,Future<PeekingResultIterator>>>(2);
-                        if (!clearedCache) { // Clear cache once so that we 
rejigger job based on new boundaries
-                            services.clearTableRegionCache(tableName);
-                            clearedCache = true;
-                        }
-                        List<KeyRange> allSplits = 
toKeyRanges(services.getAllTableRegions(tableName));
-                        // Intersect what was the expected boundary with all 
new region boundaries and
-                        // resubmit just this portion of work again
-                        List<KeyRange> newSubSplits = 
KeyRange.intersect(Collections.singletonList(future.getFirst()), allSplits);
-                        submitWork(scanId, newSubSplits, newFutures);
-                        for (Pair<KeyRange,Future<PeekingResultIterator>> 
newFuture : newFutures) {
-                            // Immediate do a get (not catching exception 
again) and then add the iterators we
-                            // get back immediately. They'll be sorted as 
expected, since they're replacing the
-                            // original one.
-                            PeekingResultIterator iterator = 
newFuture.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
-                            iterators.add(iterator);
+            for (List<Pair<Scan,Future<PeekingResultIterator>>> future : 
futures) {
+                List<PeekingResultIterator> concatIterators = 
Lists.newArrayListWithExpectedSize(future.size());
+                for (Pair<Scan,Future<PeekingResultIterator>> scanPair : 
future) {
+                    try {
+                        PeekingResultIterator iterator = 
scanPair.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
+                        concatIterators.add(iterator);
+                    } catch (ExecutionException e) {
+                        try { // Rethrow as SQLException
+                            throw ServerUtil.parseServerException(e);
+                        } catch (StaleRegionBoundaryCacheException e2) { // 
Catch only to try to recover from region boundary cache being out of date
+                            
List<List<Pair<Scan,Future<PeekingResultIterator>>>> newFutures = 
Lists.newArrayListWithExpectedSize(2);
+                            if (!clearedCache) { // Clear cache once so that 
we rejigger job based on new boundaries
+                                services.clearTableRegionCache(tableName);
+                                clearedCache = true;
+                            }
+                            List<KeyRange> allSplits = 
toKeyRanges(services.getAllTableRegions(tableName));
+                            // Intersect what was the expected boundary with 
all new region boundaries and
+                            // resubmit just this portion of work again
+                            Scan oldScan = scanPair.getFirst();
+                            List<KeyRange> newSubSplits = 
KeyRange.intersect(Collections.singletonList(KeyRange.getKeyRange(oldScan.getStartRow(),
 oldScan.getStopRow())), allSplits);
+                            List<List<Scan>> newNestedScans = 
Lists.newArrayListWithExpectedSize(2);
+                            for (KeyRange newSubSplit : newSubSplits) {
+                                Scan newScan = 
ScanUtil.newScan(scanPair.getFirst());
+                                
newScan.setStartRow(newSubSplit.getLowerRange());
+                                
newScan.setStopRow(newSubSplit.getUpperRange());
+                                
newNestedScans.add(Collections.singletonList(newScan));
+                            }
+                            // Add any concatIterators that were successful so 
far
+                            // as we need these to be in order
+                            addConcatResultIterator(iterators, 
concatIterators);
+                            concatIterators = Collections.emptyList();
+                            submitWork(scanId, newNestedScans, newFutures);
+                            for 
(List<Pair<Scan,Future<PeekingResultIterator>>> newFuture : newFutures) {
+                                for (Pair<Scan,Future<PeekingResultIterator>> 
newScanPair : newFuture) {
+                                    // Immediate do a get (not catching 
exception again) and then add the iterators we
+                                    // get back immediately. They'll be sorted 
as expected, since they're replacing the
+                                    // original one.
+                                    PeekingResultIterator iterator = 
newScanPair.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
+                                    iterators.add(iterator);
+                                }
+                            }
                         }
                     }
                 }
+                addConcatResultIterator(iterators, concatIterators);
             }
 
             success = true;
@@ -322,70 +506,79 @@ public class ParallelIterators extends ExplainTable 
implements ResultIterators {
         }
     }
     
-    private void submitWork(final UUID scanId, List<KeyRange> splits,
-            List<Pair<KeyRange,Future<PeekingResultIterator>>> futures) {
+    private static final class ScanLocation {
+       private final int outerListIndex;
+       private final int innerListIndex;
+       private final Scan scan;
+       public ScanLocation(Scan scan, int outerListIndex, int innerListIndex) {
+               this.outerListIndex = outerListIndex;
+               this.innerListIndex = innerListIndex;
+               this.scan = scan;
+       }
+       public int getOuterListIndex() {
+               return outerListIndex;
+       }
+       public int getInnerListIndex() {
+               return innerListIndex;
+       }
+       public Scan getScan() {
+               return scan;
+       }
+    }
+    private void submitWork(final UUID scanId, List<List<Scan>> nestedScans,
+            List<List<Pair<Scan,Future<PeekingResultIterator>>>> 
nestedFutures) {
         final ConnectionQueryServices services = 
context.getConnection().getQueryServices();
         ExecutorService executor = services.getExecutor();
-        final boolean localIndex = this.tableRef.getTable().getType() == 
PTableType.INDEX && this.tableRef.getTable().getIndexType() == IndexType.LOCAL;
-        for (final KeyRange split : splits) {
-            final Scan splitScan = ScanUtil.newScan(context.getScan());
-            // Intersect with existing start/stop key if the table is salted
-            // If not salted, we've already intersected it. If salted, we need
-            // to wait until now to intersect, as we're running parallel scans
-            // on all the possible regions here.
-            if (tableRef.getTable().getBucketNum() != null) {
-                KeyRange minMaxRange = context.getMinMaxRange();
-                if (minMaxRange != null) {
-                    // Add salt byte based on current split, as minMaxRange 
won't have it
-                    minMaxRange = 
SaltingUtil.addSaltByte(split.getLowerRange(), minMaxRange);
-                    // FIXME: seems like this should be possible when we set 
the scan start/stop
-                    // in StatementContext.setScanRanges(). If it doesn't 
intersect the range for
-                    // one salt byte, I don't see how it could intersect it 
with any of them.
-                    if (!ScanUtil.intersectScanRange(splitScan, 
minMaxRange.getLowerRange(), minMaxRange.getUpperRange())) {
-                        continue; // Skip this chunk if no intersection based 
on minMaxRange
-                    }
-                }
-            } else if (localIndex) {
-                // Used to detect stale region boundary information on server 
side
-                splitScan.setAttribute(EXPECTED_UPPER_REGION_KEY, 
split.getUpperRange());
-                if (splitScan.getStartRow().length != 0 || 
splitScan.getStopRow().length != 0) {
-                    
SaltingUtil.addRegionStartKeyToScanStartAndStopRows(split.getLowerRange(),split.getUpperRange(),
-                        splitScan);
-                }
-            } 
-            if (ScanUtil.intersectScanRange(splitScan, split.getLowerRange(), 
split.getUpperRange(), this.context.getScanRanges().useSkipScanFilter())) {
-                Future<PeekingResultIterator> future =
-                    executor.submit(Tracing.wrap(new 
JobCallable<PeekingResultIterator>() {
+        // Pre-populate nestedFutures lists so that we can shuffle the scans
+        // and add the future to the right nested list. By shuffling the scans
+        // we get better utilization of the cluster since our thread executor
+        // will spray the scans across machines as opposed to targeting a
+        // single one since the scans are in row key order.
+        List<ScanLocation> scanLocations = 
Lists.newArrayListWithExpectedSize(splits.size());
+        for (int i = 0; i < nestedScans.size(); i++) {
+            List<Pair<Scan,Future<PeekingResultIterator>>> futures = 
Lists.newArrayListWithExpectedSize(scans.size());
+            nestedFutures.add(futures);
+            for (int j = 0; j < nestedScans.get(i).size(); j++) {
+               Scan scan = nestedScans.get(i).get(j);
+                scanLocations.add(new ScanLocation(scan, i, j));
+                futures.add(null); // placeholder
+            }
+        }
+        Collections.shuffle(scanLocations);
+        for (ScanLocation scanLocation : scanLocations) {
+            final Scan scan = scanLocation.getScan();
+            Future<PeekingResultIterator> future =
+                executor.submit(Tracing.wrap(new 
JobCallable<PeekingResultIterator>() {
 
-                    @Override
-                    public PeekingResultIterator call() throws Exception {
-                        long startTime = System.currentTimeMillis();
-                        ResultIterator scanner = new 
TableResultIterator(context, tableRef, splitScan);
-                        if (logger.isDebugEnabled()) {
-                            logger.debug(LogUtil.addCustomAnnotations("Id: " + 
scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + 
splitScan, ScanUtil.getCustomAnnotations(splitScan)));
-                        }
-                        return iteratorFactory.newIterator(context, scanner, 
splitScan);
+                @Override
+                public PeekingResultIterator call() throws Exception {
+                    long startTime = System.currentTimeMillis();
+                    ResultIterator scanner = new TableResultIterator(context, 
tableRef, scan);
+                    if (logger.isDebugEnabled()) {
+                        logger.debug(LogUtil.addCustomAnnotations("Id: " + 
scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + 
scan, ScanUtil.getCustomAnnotations(scan)));
                     }
+                    return iteratorFactory.newIterator(context, scanner, scan);
+                }
 
-                    /**
-                     * Defines the grouping for round robin behavior.  All 
threads spawned to process
-                     * this scan will be grouped together and time sliced with 
other simultaneously
-                     * executing parallel scans.
-                     */
-                    @Override
-                    public Object getJobId() {
-                        return ParallelIterators.this;
-                    }
-                }, "Parallel scanner for table: " + 
tableRef.getTable().getName().getString()));
-                futures.add(new 
Pair<KeyRange,Future<PeekingResultIterator>>(split,future));
-            }
+                /**
+                 * Defines the grouping for round robin behavior.  All threads 
spawned to process
+                 * this scan will be grouped together and time sliced with 
other simultaneously
+                 * executing parallel scans.
+                 */
+                @Override
+                public Object getJobId() {
+                    return ParallelIterators.this;
+                }
+            }, "Parallel scanner for table: " + 
tableRef.getTable().getName().getString()));
+            // Add our future in the right place so that we can concatenate the
+            // results of the inner futures versus merge sorting across all of 
them.
+            
nestedFutures.get(scanLocation.getOuterListIndex()).set(scanLocation.getInnerListIndex(),
 new Pair<Scan,Future<PeekingResultIterator>>(scan,future));
         }
-
     }
 
     @Override
     public int size() {
-        return this.splits.size();
+        return this.scans.size();
     }
 
     @Override
@@ -397,6 +590,6 @@ public class ParallelIterators extends ExplainTable 
implements ResultIterators {
 
        @Override
        public String toString() {
-               return "ParallelIterators [splits=" + splits + "]";
+               return "ParallelIterators [scans=" + scans + "]";
        }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a5d07cc0/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index fc79173..f0a9d88 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -264,7 +264,7 @@ public class ScanUtil {
 
     private static byte[] getKey(RowKeySchema schema, List<List<KeyRange>> 
slots, int[] slotSpan, Bound bound) {
         if (slots.isEmpty()) {
-            return null;
+            return KeyRange.UNBOUND;
         }
         int[] position = new int[slots.size()];
         int maxLength = 0;
@@ -276,7 +276,7 @@ public class ScanUtil {
         byte[] key = new byte[maxLength];
         int length = setKey(schema, slots, slotSpan, position, bound, key, 0, 
0, position.length);
         if (length == 0) {
-            return null;
+            return KeyRange.UNBOUND;
         }
         if (length == maxLength) {
             return key;

Reply via email to