Repository: phoenix Updated Branches: refs/heads/4.0 f7e6a6c7e -> 51f69bcb6
PHOENIX-1251 Salted queries with range scan become full table scans Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a5d07cc0 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a5d07cc0 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a5d07cc0 Branch: refs/heads/4.0 Commit: a5d07cc076763000f0c48c4c958e33aa578e85a9 Parents: 846ed10 Author: James Taylor <jtay...@salesforce.com> Authored: Wed Oct 1 08:49:04 2014 -0700 Committer: James Taylor <jtay...@salesforce.com> Committed: Wed Oct 1 08:49:04 2014 -0700 ---------------------------------------------------------------------- .../org/apache/phoenix/end2end/InListIT.java | 2 +- .../org/apache/phoenix/compile/ScanRanges.java | 239 ++++++++++-- .../phoenix/compile/StatementContext.java | 21 +- .../apache/phoenix/compile/WhereOptimizer.java | 33 +- .../phoenix/index/PhoenixIndexBuilder.java | 1 - .../phoenix/iterate/ParallelIterators.java | 391 ++++++++++++++----- .../java/org/apache/phoenix/util/ScanUtil.java | 4 +- 7 files changed, 530 insertions(+), 161 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/a5d07cc0/phoenix-core/src/it/java/org/apache/phoenix/end2end/InListIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/InListIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/InListIT.java index dc60b69..60bcb65 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/InListIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/InListIT.java @@ -163,7 +163,7 @@ public class InListIT extends BaseHBaseManagedTimeIT { // the different combinations to check each test against private static final List<Boolean> TENANCIES = Arrays.asList(false, true); private static final List<PDataType> INTEGER_TYPES = Arrays.asList(PDataType.INTEGER, PDataType.LONG); - private static final List<Integer> SALT_BUCKET_NUMBERS = Arrays.asList(0, 4); + private static final List<Integer> SALT_BUCKET_NUMBERS = Arrays.asList(/*0,*/ 4); // we should be including the RANGE_SCAN hint here, but a bug with ParallelIterators causes tests to fail // see the relevant JIRA here: https://issues.apache.org/jira/browse/PHOENIX-1251 http://git-wip-us.apache.org/repos/asf/phoenix/blob/a5d07cc0/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java index dc8e0b3..1c739f3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java @@ -23,12 +23,17 @@ import java.util.Iterator; import java.util.List; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.filter.SkipScanFilter; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.KeyRange.Bound; +import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.RowKeySchema; import org.apache.phoenix.schema.SaltingUtil; +import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; @@ -40,8 +45,8 @@ import com.google.common.collect.Lists; public class ScanRanges { private static final List<List<KeyRange>> EVERYTHING_RANGES = Collections.<List<KeyRange>>emptyList(); private static final List<List<KeyRange>> NOTHING_RANGES = Collections.<List<KeyRange>>singletonList(Collections.<KeyRange>singletonList(KeyRange.EMPTY_RANGE)); - public static final ScanRanges EVERYTHING = new ScanRanges(null,ScanUtil.SINGLE_COLUMN_SLOT_SPAN,EVERYTHING_RANGES, false, false); - public static final ScanRanges NOTHING = new ScanRanges(null,ScanUtil.SINGLE_COLUMN_SLOT_SPAN,NOTHING_RANGES, false, false); + public static final ScanRanges EVERYTHING = new ScanRanges(null,ScanUtil.SINGLE_COLUMN_SLOT_SPAN,EVERYTHING_RANGES, KeyRange.EVERYTHING_RANGE, false, false, null); + public static final ScanRanges NOTHING = new ScanRanges(null,ScanUtil.SINGLE_COLUMN_SLOT_SPAN,NOTHING_RANGES, KeyRange.EMPTY_RANGE, false, false, null); public static ScanRanges create(RowKeySchema schema, List<List<KeyRange>> ranges, int[] slotSpan) { return create(schema, ranges, slotSpan, false, null); @@ -72,37 +77,210 @@ public class ScanRanges { // when there's a single key. slotSpan = new int[] {schema.getMaxFields()-1}; } - } else if (nBuckets != null) { + } /*else if (nBuckets != null) { List<List<KeyRange>> saltedRanges = Lists.newArrayListWithExpectedSize(ranges.size()); saltedRanges.add(SaltingUtil.generateAllSaltingRanges(nBuckets)); saltedRanges.addAll(ranges.subList(1, ranges.size())); ranges = saltedRanges; + }*/ + List<List<KeyRange>> sortedRanges = Lists.newArrayListWithExpectedSize(ranges.size()); + for (int i = 0; i < ranges.size(); i++) { + List<KeyRange> sorted = Lists.newArrayList(ranges.get(i)); + Collections.sort(sorted, KeyRange.COMPARATOR); + sortedRanges.add(ImmutableList.copyOf(sorted)); } - return new ScanRanges(schema, slotSpan, ranges, forceRangeScan, isPointLookup); + // Don't set minMaxRange for point lookup because it causes issues during intersect + // by us ignoring the salt byte + KeyRange minMaxRange = isPointLookup ? KeyRange.EVERYTHING_RANGE : calculateMinMaxRange(schema, slotSpan, sortedRanges); + return new ScanRanges(schema, slotSpan, sortedRanges, minMaxRange, forceRangeScan, isPointLookup, nBuckets); } + private static KeyRange calculateMinMaxRange(RowKeySchema schema, int[] slotSpan, List<List<KeyRange>> ranges) { + byte[] minKey = ScanUtil.getMinKey(schema, ranges, slotSpan); + byte[] maxKey = ScanUtil.getMaxKey(schema, ranges, slotSpan); + return KeyRange.getKeyRange(minKey, maxKey); + } + private SkipScanFilter filter; private final List<List<KeyRange>> ranges; private final int[] slotSpan; private final RowKeySchema schema; - private final boolean forceRangeScan; private final boolean isPointLookup; + private final boolean isSalted; + private final boolean useSkipScanFilter; + private final KeyRange minMaxRange; - private ScanRanges (RowKeySchema schema, int[] slotSpan, List<List<KeyRange>> ranges, boolean forceRangeScan, boolean isPointLookup) { + private ScanRanges (RowKeySchema schema, int[] slotSpan, List<List<KeyRange>> ranges, KeyRange minMaxRange, boolean forceRangeScan, boolean isPointLookup, Integer bucketNum) { this.isPointLookup = isPointLookup; - List<List<KeyRange>> sortedRanges = Lists.newArrayListWithExpectedSize(ranges.size()); - for (int i = 0; i < ranges.size(); i++) { - List<KeyRange> sorted = Lists.newArrayList(ranges.get(i)); - Collections.sort(sorted, KeyRange.COMPARATOR); - sortedRanges.add(ImmutableList.copyOf(sorted)); + this.isSalted = bucketNum != null; + this.minMaxRange = minMaxRange; + this.useSkipScanFilter = useSkipScanFilter(forceRangeScan, isPointLookup, ranges); + + // Only blow out the bucket values if we're using the skip scan. We need all the + // bucket values in this case because we use intersect against a key that may have + // any of the possible bucket values. Otherwise, we can pretty easily ignore the + // bucket values. + if (useSkipScanFilter && isSalted && !isPointLookup) { + ranges.set(0, SaltingUtil.generateAllSaltingRanges(bucketNum)); } - this.ranges = ImmutableList.copyOf(sortedRanges); + this.ranges = ImmutableList.copyOf(ranges); this.slotSpan = slotSpan; this.schema = schema; - if (schema != null && !ranges.isEmpty()) { + if (schema != null && !ranges.isEmpty()) { // TODO: only create if useSkipScanFilter is true? this.filter = new SkipScanFilter(this.ranges, slotSpan, schema); } - this.forceRangeScan = forceRangeScan; + } + + private static byte[] prefixKey(byte[] key, int keyOffset, byte[] prefixKey, int prefixKeyOffset) { + if (key.length > 0) { + byte[] newKey = new byte[key.length + prefixKeyOffset]; + int totalKeyOffset = keyOffset + prefixKeyOffset; + System.arraycopy(prefixKey, 0, newKey, 0, totalKeyOffset); + System.arraycopy(key, keyOffset, newKey, totalKeyOffset, key.length - keyOffset); + return newKey; + } + return key; + } + + private static byte[] replaceSaltByte(byte[] key, byte[] saltKey) { + if (key.length == 0) { + return key; + } + byte[] temp = new byte[key.length]; + System.arraycopy(saltKey, 0, temp, 0, SaltingUtil.NUM_SALTING_BYTES); + System.arraycopy(key, SaltingUtil.NUM_SALTING_BYTES, temp, SaltingUtil.NUM_SALTING_BYTES, key.length - SaltingUtil.NUM_SALTING_BYTES); + return temp; + } + + private static byte[] stripLocalIndexPrefix(byte[] key, int keyOffset) { + if (key.length == 0) { + return key; + } + byte[] temp = new byte[key.length - keyOffset]; + System.arraycopy(key, keyOffset, temp, 0, key.length - keyOffset); + return temp; + } + + public Scan intersect(Scan scan, final byte[] originalStartKey, final byte[] originalStopKey, final int keyOffset) { + byte[] startKey = originalStartKey; + byte[] stopKey = originalStopKey; + boolean mayHaveRows = false; + final int scanKeyOffset = this.isSalted ? SaltingUtil.NUM_SALTING_BYTES : 0; + // Offset for startKey/stopKey. Either 1 for salted tables or the prefix length + // of the current region for local indexes. + final int totalKeyOffset = scanKeyOffset + keyOffset; + // In this case, we've crossed the "prefix" boundary and should consider everything after the startKey + // This prevents us from having to prefix the key prior to knowing whether or not there may be an + // intersection. + byte[] prefixBytes = ByteUtil.EMPTY_BYTE_ARRAY; + if (totalKeyOffset > 0) { + prefixBytes = startKey.length > 0 ? startKey : (this.isSalted ? QueryConstants.SEPARATOR_BYTE_ARRAY : stopKey); + } + if (stopKey.length < totalKeyOffset || Bytes.compareTo(prefixBytes, 0, totalKeyOffset, stopKey, 0, totalKeyOffset) != 0) { + stopKey = ByteUtil.EMPTY_BYTE_ARRAY; + } + assert (scanKeyOffset == 0 || keyOffset == 0); + int scanStartKeyOffset = scanKeyOffset; + byte[] scanStartKey = scan.getStartRow(); + // Compare ignoring key prefix and salt byte + if (scanStartKey.length > 0) { + if (startKey.length > 0 && Bytes.compareTo(scanStartKey, scanKeyOffset, scanStartKey.length - scanKeyOffset, startKey, totalKeyOffset, startKey.length - totalKeyOffset) < 0) { + scanStartKey = startKey; + scanStartKeyOffset = totalKeyOffset; + } + } else { + scanStartKey = startKey; + scanStartKeyOffset = totalKeyOffset; + mayHaveRows = true; + } + int scanStopKeyOffset = scanKeyOffset; + byte[] scanStopKey = scan.getStopRow(); + if (scanStopKey.length > 0) { + if (stopKey.length > 0 && Bytes.compareTo(scanStopKey, scanKeyOffset, scanStopKey.length - scanKeyOffset, stopKey, totalKeyOffset, stopKey.length - totalKeyOffset) > 0) { + scanStopKey = stopKey; + scanStopKeyOffset = totalKeyOffset; + } + } else { + scanStopKey = stopKey; + scanStopKeyOffset = totalKeyOffset; + mayHaveRows = true; + } + mayHaveRows = mayHaveRows || Bytes.compareTo(scanStartKey, scanStartKeyOffset, scanStartKey.length - scanStartKeyOffset, scanStopKey, scanStopKeyOffset, scanStopKey.length - scanStopKeyOffset) < 0; + + if (!mayHaveRows) { + return null; + } + if (originalStopKey.length != 0 && scanStopKey.length == 0) { + scanStopKey = originalStopKey; + } + Filter newFilter = scan.getFilter(); + // If the scan is using skip scan filter, intersect and replace the filter. + if (this.useSkipScanFilter()) { + byte[] skipScanStartKey = scanStartKey; + byte[] skipScanStopKey = scanStopKey; + // If we have a keyOffset and we've used the startKey/stopKey that + // were passed in (which have the prefix) for the above range check, + // we need to remove the prefix before running our intersect method. + // TODO: we could use skipScanFilter.setOffset(keyOffset) if both + // the startKey and stopKey were used above *and* our intersect + // method honored the skipScanFilter.offset variable. + if (scanKeyOffset > 0) { + if (skipScanStartKey != originalStartKey) { // original already has correct salt byte + skipScanStartKey = replaceSaltByte(skipScanStartKey, prefixBytes); + } + if (skipScanStopKey != originalStopKey) { + skipScanStopKey = replaceSaltByte(skipScanStopKey, prefixBytes); + } + } else if (keyOffset > 0) { + if (skipScanStartKey == originalStartKey) { + skipScanStartKey = stripLocalIndexPrefix(skipScanStartKey, keyOffset); + } + if (skipScanStopKey == originalStopKey) { + skipScanStopKey = stripLocalIndexPrefix(skipScanStopKey, keyOffset); + } + } + Filter filter = scan.getFilter(); + if (filter instanceof SkipScanFilter) { + SkipScanFilter oldSkipScanFilter = (SkipScanFilter)filter; + newFilter = oldSkipScanFilter.intersect(skipScanStartKey, skipScanStopKey); + if (newFilter == null) { + return null; + } + } else if (filter instanceof FilterList) { + FilterList oldList = (FilterList)filter; + FilterList newList = new FilterList(FilterList.Operator.MUST_PASS_ALL); + newFilter = newList; + for (Filter f : oldList.getFilters()) { + if (f instanceof SkipScanFilter) { + SkipScanFilter newSkipScanFilter = ((SkipScanFilter)f).intersect(skipScanStartKey, skipScanStopKey); + if (newSkipScanFilter == null) { + return null; + } + newList.addFilter(newSkipScanFilter); + } else { + newList.addFilter(f); + } + } + } + } + Scan newScan = ScanUtil.newScan(scan); + newScan.setFilter(newFilter); + // If we have an offset (salted table or local index), we need to make sure to + // prefix our scan start/stop row by the prefix of the startKey or stopKey that + // were passed in. Our scan either doesn't have the prefix or has a placeholder + // for it. + if (totalKeyOffset > 0) { + if (scanStartKey != originalStartKey) { + scanStartKey = prefixKey(scanStartKey, scanKeyOffset, prefixBytes, keyOffset); + } + if (scanStopKey != originalStopKey) { + scanStopKey = prefixKey(scanStopKey, scanKeyOffset, prefixBytes, keyOffset); + } + } + newScan.setStartRow(scanStartKey); + newScan.setStopRow(scanStopKey); + + return newScan; } public SkipScanFilter getSkipScanFilter() { @@ -132,11 +310,15 @@ public class ScanRanges { * not the last key slot */ public boolean useSkipScanFilter() { + return useSkipScanFilter; + } + + private static boolean useSkipScanFilter(boolean forceRangeScan, boolean isPointLookup, List<List<KeyRange>> ranges) { if (forceRangeScan) { return false; } if (isPointLookup) { - return getPointLookupCount() > 1; + return getPointLookupCount(isPointLookup, ranges) > 1; } boolean hasRangeKey = false, useSkipScan = false; for (List<KeyRange> orRanges : ranges) { @@ -208,6 +390,10 @@ public class ScanRanges { } public int getPointLookupCount() { + return getPointLookupCount(isPointLookup, ranges); + } + + private static int getPointLookupCount(boolean isPointLookup, List<List<KeyRange>> ranges) { return isPointLookup ? ranges.get(0).size() : 0; } @@ -215,27 +401,10 @@ public class ScanRanges { return isPointLookup ? ranges.get(0).iterator() : Iterators.<KeyRange>emptyIterator(); } - public void setScanStartStopRow(Scan scan) { - if (isEverything()) { - return; - } - if (isDegenerate()) { - scan.setStartRow(KeyRange.EMPTY_RANGE.getLowerRange()); - scan.setStopRow(KeyRange.EMPTY_RANGE.getUpperRange()); - return; - } - - byte[] expectedKey; - expectedKey = ScanUtil.getMinKey(schema, ranges, slotSpan); - if (expectedKey != null) { - scan.setStartRow(expectedKey); - } - expectedKey = ScanUtil.getMaxKey(schema, ranges, slotSpan); - if (expectedKey != null) { - scan.setStopRow(expectedKey); - } + public KeyRange getMinMaxRange() { + return minMaxRange; } - + public static final ImmutableBytesWritable UNBOUND = new ImmutableBytesWritable(KeyRange.UNBOUND); /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/a5d07cc0/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java index 56e63ae..90264bb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java @@ -192,14 +192,14 @@ public class StatementContext { } public void setScanRanges(ScanRanges scanRanges) { - setScanRanges(scanRanges, null); + setScanRanges(scanRanges, KeyRange.EVERYTHING_RANGE); } public void setScanRanges(ScanRanges scanRanges, KeyRange minMaxRange) { this.scanRanges = scanRanges; - this.scanRanges.setScanStartStopRow(scan); - PTable table = this.getCurrentTable().getTable(); - if (minMaxRange != null) { + KeyRange scanRange = scanRanges.getMinMaxRange(); + if (minMaxRange != KeyRange.EVERYTHING_RANGE) { + PTable table = this.getCurrentTable().getTable(); // Ensure minMaxRange is lower inclusive and upper exclusive, as that's // what we need to intersect against for the HBase scan. byte[] lowerRange = minMaxRange.getLowerRange(); @@ -216,17 +216,12 @@ public class StatementContext { } } if (minMaxRange.getLowerRange() != lowerRange || minMaxRange.getUpperRange() != upperRange) { - minMaxRange = KeyRange.getKeyRange(lowerRange, true, upperRange, false); - } - // If we're not salting, we can intersect this now with the scan range. - // Otherwise, we have to wait to do this when we chunk up the scan. - if (table.getBucketNum() == null) { - minMaxRange = minMaxRange.intersect(KeyRange.getKeyRange(scan.getStartRow(), scan.getStopRow())); - scan.setStartRow(minMaxRange.getLowerRange()); - scan.setStopRow(minMaxRange.getUpperRange()); + minMaxRange = KeyRange.getKeyRange(lowerRange, upperRange); } - this.minMaxRange = minMaxRange; + scanRange = scanRange.intersect(minMaxRange); } + scan.setStartRow(scanRange.getLowerRange()); + scan.setStopRow(scanRange.getUpperRange()); } public PhoenixConnection getConnection() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/a5d07cc0/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java index 51da924..23abf06 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java @@ -58,6 +58,7 @@ import org.apache.phoenix.schema.PDataType; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.RowKeySchema; +import org.apache.phoenix.schema.SaltingUtil; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.ByteUtil; @@ -146,14 +147,30 @@ public class WhereOptimizer { RowKeySchema schema = table.getRowKeySchema(); List<List<KeyRange>> cnf = Lists.newArrayListWithExpectedSize(schema.getMaxFields()); KeyRange minMaxRange = keySlots.getMinMaxRange(); - boolean hasMinMaxRange = (minMaxRange != null); + if (minMaxRange == null) { + minMaxRange = KeyRange.EVERYTHING_RANGE; + } + boolean hasMinMaxRange = (minMaxRange != KeyRange.EVERYTHING_RANGE); int minMaxRangeOffset = 0; byte[] minMaxRangePrefix = null; + boolean isSalted = nBuckets != null; + boolean isMultiTenant = tenantId != null && table.isMultiTenant(); + boolean hasViewIndex = table.getViewIndexId() != null; + if (hasMinMaxRange) { + int minMaxRangeSize = (isSalted ? SaltingUtil.NUM_SALTING_BYTES : 0) + + (isMultiTenant ? tenantId.getBytes().length + 1 : 0) + + (hasViewIndex ? MetaDataUtil.getViewIndexIdDataType().getByteSize() : 0); + minMaxRangePrefix = new byte[minMaxRangeSize]; + } Iterator<KeyExpressionVisitor.KeySlot> iterator = keySlots.iterator(); // Add placeholder for salt byte ranges - if (nBuckets != null) { + if (isSalted) { cnf.add(SALT_PLACEHOLDER); + if (hasMinMaxRange) { + System.arraycopy(SALT_PLACEHOLDER.get(0).getLowerRange(), 0, minMaxRangePrefix, minMaxRangeOffset, SaltingUtil.NUM_SALTING_BYTES); + minMaxRangeOffset += SaltingUtil.NUM_SALTING_BYTES; + } // Increment the pkPos, as the salt column is in the row schema // Do not increment the iterator, though, as there will never be // an expression in the keySlots for the salt column @@ -161,13 +178,12 @@ public class WhereOptimizer { } // Add tenant data isolation for tenant-specific tables - if (tenantId != null && table.isMultiTenant()) { + if (isMultiTenant) { byte[] tenantIdBytes = tenantId.getBytes(); KeyRange tenantIdKeyRange = KeyRange.getKeyRange(tenantIdBytes); cnf.add(singletonList(tenantIdKeyRange)); if (hasMinMaxRange) { - minMaxRangePrefix = new byte[tenantIdBytes.length + MetaDataUtil.getViewIndexIdDataType().getByteSize() + 1]; - System.arraycopy(tenantIdBytes, 0, minMaxRangePrefix, 0, tenantIdBytes.length); + System.arraycopy(tenantIdBytes, 0, minMaxRangePrefix, minMaxRangeOffset, tenantIdBytes.length); minMaxRangeOffset += tenantIdBytes.length; if (!schema.getField(pkPos).getDataType().isFixedWidth()) { minMaxRangePrefix[minMaxRangeOffset] = QueryConstants.SEPARATOR_BYTE; @@ -178,14 +194,11 @@ public class WhereOptimizer { } // Add unique index ID for shared indexes on views. This ensures // that different indexes don't interleave. - if (table.getViewIndexId() != null) { + if (hasViewIndex) { byte[] viewIndexBytes = MetaDataUtil.getViewIndexIdDataType().toBytes(table.getViewIndexId()); KeyRange indexIdKeyRange = KeyRange.getKeyRange(viewIndexBytes); cnf.add(singletonList(indexIdKeyRange)); if (hasMinMaxRange) { - if (minMaxRangePrefix == null) { - minMaxRangePrefix = new byte[viewIndexBytes.length]; - } System.arraycopy(viewIndexBytes, 0, minMaxRangePrefix, minMaxRangeOffset, viewIndexBytes.length); minMaxRangeOffset += viewIndexBytes.length; } @@ -194,7 +207,7 @@ public class WhereOptimizer { // Prepend minMaxRange with fixed column values so we can properly intersect the // range with the other range. - if (minMaxRange != null) { + if (hasMinMaxRange) { minMaxRange = minMaxRange.prependRange(minMaxRangePrefix, 0, minMaxRangeOffset); } boolean forcedSkipScan = statement.getHint().hasHint(Hint.SKIP_SCAN); http://git-wip-us.apache.org/repos/asf/phoenix/blob/a5d07cc0/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java index de5a9cc..6897106 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java @@ -71,7 +71,6 @@ public class PhoenixIndexBuilder extends CoveredColumnsIndexBuilder { if (maintainers.isEmpty()) return; Scan scan = IndexManagementUtil.newLocalStateScan(new ArrayList<IndexMaintainer>(maintainers.values())); ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN); - scanRanges.setScanStartStopRow(scan); scan.setFilter(scanRanges.getSkipScanFilter()); HRegion region = this.env.getRegion(); RegionScanner scanner = region.getScanner(scan); http://git-wip-us.apache.org/repos/asf/phoenix/blob/a5d07cc0/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java index a2dabe3..4145def 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java @@ -17,12 +17,9 @@ */ package org.apache.phoenix.iterate; -import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY; - import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -36,6 +33,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; @@ -44,6 +42,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.filter.ColumnProjectionFilter; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; @@ -55,15 +54,15 @@ import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.ColumnFamilyNotFoundException; import org.apache.phoenix.schema.PColumnFamily; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.PTable.ViewType; -import org.apache.phoenix.schema.PTableType; -import org.apache.phoenix.schema.SaltingUtil; import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.trace.util.Tracing; +import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SQLCloseables; @@ -74,6 +73,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -87,6 +87,7 @@ import com.google.common.collect.Lists; */ public class ParallelIterators extends ExplainTable implements ResultIterators { private static final Logger logger = LoggerFactory.getLogger(ParallelIterators.class); + private final List<List<Scan>> scans; private final List<KeyRange> splits; private final ParallelIteratorFactory iteratorFactory; @@ -95,6 +96,7 @@ public class ParallelIterators extends ExplainTable implements ResultIterators { } private static final int DEFAULT_THREAD_TIMEOUT_MS = 60000; // 1min + private static final int ESTIMATED_GUIDEPOSTS_PER_REGION = 20; static final Function<HRegionLocation, KeyRange> TO_KEY_RANGE = new Function<HRegionLocation, KeyRange>() { @Override @@ -107,10 +109,8 @@ public class ParallelIterators extends ExplainTable implements ResultIterators { RowProjector projector, GroupBy groupBy, Integer limit, ParallelIteratorFactory iteratorFactory) throws SQLException { super(context, tableRef, groupBy); - this.splits = getSplits(context, tableRef, statement.getHint()); - this.iteratorFactory = iteratorFactory; - Scan scan = context.getScan(); PTable table = tableRef.getTable(); + Scan scan = context.getScan(); if (projector.isProjectEmptyKeyValue()) { Map<byte [], NavigableSet<byte []>> familyMap = scan.getFamilyMap(); // If nothing projected into scan and we only have one column family, just allow everything @@ -144,6 +144,18 @@ public class ParallelIterators extends ExplainTable implements ResultIterators { } doColumnProjectionOptimization(context, scan, table, statement); + + this.iteratorFactory = iteratorFactory; + // TODO: get physicalTable here if we don't have it + PTable physicalTable = table; + this.scans = getParallelScans(physicalTable); + List<KeyRange> splitRanges = Lists.newArrayListWithExpectedSize(scans.size() * ESTIMATED_GUIDEPOSTS_PER_REGION); + for (List<Scan> scanList : scans) { + for (Scan aScan : scanList) { + splitRanges.add(KeyRange.getKeyRange(aScan.getStartRow(), aScan.getStopRow())); + } + } + this.splits = ImmutableList.copyOf(splitRanges); } private void doColumnProjectionOptimization(StatementContext context, Scan scan, PTable table, FilterableStatement statement) { @@ -249,6 +261,169 @@ public class ParallelIterators extends ExplainTable implements ResultIterators { return splits; } + private static List<byte[]> toBoundaries(List<HRegionLocation> regionLocations) { + int nBoundaries = regionLocations.size() - 1; + List<byte[]> ranges = Lists.newArrayListWithExpectedSize(nBoundaries); + for (int i = 0; i < nBoundaries; i++) { + HRegionInfo regionInfo = regionLocations.get(i).getRegionInfo(); + ranges.add(regionInfo.getEndKey()); + } + return ranges; + } + + private static int getIndexContainingInclusive(List<byte[]> boundaries, byte[] inclusiveKey) { + int guideIndex = Collections.binarySearch(boundaries, inclusiveKey, Bytes.BYTES_COMPARATOR); + // If we found an exact match, return the index+1, as the inclusiveKey will be contained + // in the next region (since we're matching on the end boundary). + guideIndex = (guideIndex < 0 ? -(guideIndex + 1) : (guideIndex + 1)); + return guideIndex; + } + + private static int getIndexContainingExclusive(List<byte[]> boundaries, byte[] exclusiveKey) { + int guideIndex = Collections.binarySearch(boundaries, exclusiveKey, Bytes.BYTES_COMPARATOR); + // If we found an exact match, return the index we found as the exclusiveKey won't be + // contained in the next region as with getIndexContainingInclusive. + guideIndex = (guideIndex < 0 ? -(guideIndex + 1) : guideIndex); + return guideIndex; + } + + private List<byte[]> getGuidePosts(PTable table) { + Scan scan = context.getScan(); + boolean isPointLookup = context.getScanRanges().isPointLookup(); + byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(table); + List<byte[]> gps = Collections.emptyList(); + /* + * Don't use guide posts if: + * 1) We're doing a point lookup, as HBase is fast enough at those + * to not need them to be further parallelized. TODO: pref test to verify + * 2) We're collecting stats, as in this case we need to scan entire + * regions worth of data to track where to put the guide posts. + */ + if (!isPointLookup && !ScanUtil.isAnalyzeTable(scan)) { + if (table.getColumnFamilies().isEmpty()) { + // For sure we can get the defaultCF from the table + return table.getGuidePosts(); + } + try { + if (scan.getFamilyMap().size() > 0 && !scan.getFamilyMap().containsKey(defaultCF)) { + // If default CF is not used in scan, use first CF referenced in scan + return table.getColumnFamily(scan.getFamilyMap().keySet().iterator().next()).getGuidePosts(); + } + // Otherwise, favor use of default CF. + return table.getColumnFamily(defaultCF).getGuidePosts(); + } catch (ColumnFamilyNotFoundException cfne) { + // Alter table does this + } + } + return gps; + + } + + /** + * Compute the list of parallel scans to run for a given query. The inner scans + * may be concatenated together directly, while the other ones may need to be + * merge sorted, depending on the query. + * @param physicalTable + * @return list of parallel scans to run for a given query. + * @throws SQLException + */ + private List<List<Scan>> getParallelScans(PTable physicalTable) throws SQLException { + List<HRegionLocation> regionLocations = context.getConnection().getQueryServices() + .getAllTableRegions(physicalTable.getPhysicalName().getBytes()); + List<byte[]> regionBoundaries = toBoundaries(regionLocations); + final Scan scan = context.getScan(); + ScanRanges scanRanges = context.getScanRanges(); + boolean isSalted = physicalTable.getBucketNum() != null; + boolean isLocalIndex = physicalTable.getIndexType() == IndexType.LOCAL; + List<byte[]> gps = getGuidePosts(physicalTable); + boolean traverseAllRegions = isSalted || isLocalIndex; + + byte[] startKey = ByteUtil.EMPTY_BYTE_ARRAY; + byte[] currentKey = ByteUtil.EMPTY_BYTE_ARRAY; + byte[] stopKey = ByteUtil.EMPTY_BYTE_ARRAY; + int regionIndex = 0; + int stopIndex = regionBoundaries.size(); + if (!traverseAllRegions) { + startKey = scan.getStartRow(); + if (startKey.length > 0) { + currentKey = startKey; + regionIndex = getIndexContainingInclusive(regionBoundaries, startKey); + } + stopKey = scan.getStopRow(); + if (stopKey.length > 0) { + stopIndex = Math.min(stopIndex, getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), stopKey)); + } + } + List<List<Scan>> parallelScans = Lists.newArrayListWithExpectedSize(stopIndex - regionIndex + 1); + + int guideIndex = currentKey.length == 0 ? 0 : getIndexContainingInclusive(gps, currentKey); + int gpsSize = gps.size(); + int estGuidepostsPerRegion = gpsSize == 0 ? 1 : gpsSize / regionLocations.size() + 1; + int keyOffset = 0; + // Merge bisect with guideposts for all but the last region + while (regionIndex <= stopIndex) { + byte[] currentGuidePost; + byte[] endKey = regionIndex == stopIndex ? stopKey : regionBoundaries.get(regionIndex); + if (isLocalIndex) { + HRegionInfo regionInfo = regionLocations.get(regionIndex).getRegionInfo(); + keyOffset = regionInfo.getStartKey().length > 0 ? regionInfo.getStartKey().length : regionInfo.getEndKey().length; + } + List<Scan> scans = Lists.newArrayListWithExpectedSize(estGuidepostsPerRegion); + while (guideIndex < gpsSize + && (Bytes.compareTo(currentGuidePost = gps.get(guideIndex), endKey) <= 0 || endKey.length == 0)) { + Scan newScan = scanRanges.intersect(scan, currentKey, currentGuidePost, keyOffset); + if (newScan != null) { + scans.add(newScan); + } + currentKey = currentGuidePost; + guideIndex++; + } + Scan newScan = scanRanges.intersect(scan, currentKey, endKey, keyOffset); + if (newScan != null) { + scans.add(newScan); + } + if (!scans.isEmpty()) { + parallelScans.add(scans); + } + currentKey = endKey; + regionIndex++; + } + if (logger.isDebugEnabled()) { + logger.debug(LogUtil.addCustomAnnotations("The parallelScans: " + parallelScans, + ScanUtil.getCustomAnnotations(scan))); + } + return parallelScans; + } + + private static void addConcatResultIterator(List<PeekingResultIterator> iterators, final List<PeekingResultIterator> concatIterators) { + if (!concatIterators.isEmpty()) { + if (concatIterators.size() == 1) { + iterators.add(concatIterators.get(0)); + } else { + // TODO: should ConcatResultIterator have a constructor that takes + // a List<PeekingResultIterator>? + iterators.add(new ConcatResultIterator(new ResultIterators() { + + @Override + public List<PeekingResultIterator> getIterators() throws SQLException { + return concatIterators; + } + + @Override + public int size() { + return concatIterators.size(); + } + + @Override + public void explain(List<String> planSteps) { + // TODO: review what we should for explain plan here + concatIterators.get(0).explain(planSteps); + } + + })); + } + } + } /** * Executes the scan in parallel across all regions, blocking until all scans are complete. * @return the result iterators for the scan of each region @@ -258,51 +433,60 @@ public class ParallelIterators extends ExplainTable implements ResultIterators { boolean success = false; final ConnectionQueryServices services = context.getConnection().getQueryServices(); ReadOnlyProps props = services.getProps(); - int numSplits = splits.size(); + int numSplits = size(); List<PeekingResultIterator> iterators = new ArrayList<PeekingResultIterator>(numSplits); - List<Pair<KeyRange,Future<PeekingResultIterator>>> futures = new ArrayList<Pair<KeyRange,Future<PeekingResultIterator>>>(numSplits); + List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = Lists.newArrayListWithExpectedSize(numSplits); final UUID scanId = UUID.randomUUID(); try { - submitWork(scanId, splits, futures); + submitWork(scanId, scans, futures); int timeoutMs = props.getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS); - final int factor = ScanUtil.isReversed(this.context.getScan()) ? -1 : 1; - // Sort futures by row key so that we have a predictable order we're getting rows back for scans. - // We're going to wait here until they're finished anyway and this makes testing much easier. - Collections.sort(futures, new Comparator<Pair<KeyRange,Future<PeekingResultIterator>>>() { - @Override - public int compare(Pair<KeyRange, Future<PeekingResultIterator>> o1, Pair<KeyRange, Future<PeekingResultIterator>> o2) { - return factor * Bytes.compareTo(o1.getFirst().getLowerRange(), o2.getFirst().getLowerRange()); - } - }); boolean clearedCache = false; byte[] tableName = tableRef.getTable().getPhysicalName().getBytes(); - for (Pair<KeyRange,Future<PeekingResultIterator>> future : futures) { - try { - PeekingResultIterator iterator = future.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS); - iterators.add(iterator); - } catch (ExecutionException e) { - try { // Rethrow as SQLException - throw ServerUtil.parseServerException(e); - } catch (StaleRegionBoundaryCacheException e2) { // Catch only to try to recover from region boundary cache being out of date - List<Pair<KeyRange,Future<PeekingResultIterator>>> newFutures = new ArrayList<Pair<KeyRange,Future<PeekingResultIterator>>>(2); - if (!clearedCache) { // Clear cache once so that we rejigger job based on new boundaries - services.clearTableRegionCache(tableName); - clearedCache = true; - } - List<KeyRange> allSplits = toKeyRanges(services.getAllTableRegions(tableName)); - // Intersect what was the expected boundary with all new region boundaries and - // resubmit just this portion of work again - List<KeyRange> newSubSplits = KeyRange.intersect(Collections.singletonList(future.getFirst()), allSplits); - submitWork(scanId, newSubSplits, newFutures); - for (Pair<KeyRange,Future<PeekingResultIterator>> newFuture : newFutures) { - // Immediate do a get (not catching exception again) and then add the iterators we - // get back immediately. They'll be sorted as expected, since they're replacing the - // original one. - PeekingResultIterator iterator = newFuture.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS); - iterators.add(iterator); + for (List<Pair<Scan,Future<PeekingResultIterator>>> future : futures) { + List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(future.size()); + for (Pair<Scan,Future<PeekingResultIterator>> scanPair : future) { + try { + PeekingResultIterator iterator = scanPair.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS); + concatIterators.add(iterator); + } catch (ExecutionException e) { + try { // Rethrow as SQLException + throw ServerUtil.parseServerException(e); + } catch (StaleRegionBoundaryCacheException e2) { // Catch only to try to recover from region boundary cache being out of date + List<List<Pair<Scan,Future<PeekingResultIterator>>>> newFutures = Lists.newArrayListWithExpectedSize(2); + if (!clearedCache) { // Clear cache once so that we rejigger job based on new boundaries + services.clearTableRegionCache(tableName); + clearedCache = true; + } + List<KeyRange> allSplits = toKeyRanges(services.getAllTableRegions(tableName)); + // Intersect what was the expected boundary with all new region boundaries and + // resubmit just this portion of work again + Scan oldScan = scanPair.getFirst(); + List<KeyRange> newSubSplits = KeyRange.intersect(Collections.singletonList(KeyRange.getKeyRange(oldScan.getStartRow(), oldScan.getStopRow())), allSplits); + List<List<Scan>> newNestedScans = Lists.newArrayListWithExpectedSize(2); + for (KeyRange newSubSplit : newSubSplits) { + Scan newScan = ScanUtil.newScan(scanPair.getFirst()); + newScan.setStartRow(newSubSplit.getLowerRange()); + newScan.setStopRow(newSubSplit.getUpperRange()); + newNestedScans.add(Collections.singletonList(newScan)); + } + // Add any concatIterators that were successful so far + // as we need these to be in order + addConcatResultIterator(iterators, concatIterators); + concatIterators = Collections.emptyList(); + submitWork(scanId, newNestedScans, newFutures); + for (List<Pair<Scan,Future<PeekingResultIterator>>> newFuture : newFutures) { + for (Pair<Scan,Future<PeekingResultIterator>> newScanPair : newFuture) { + // Immediate do a get (not catching exception again) and then add the iterators we + // get back immediately. They'll be sorted as expected, since they're replacing the + // original one. + PeekingResultIterator iterator = newScanPair.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS); + iterators.add(iterator); + } + } } } } + addConcatResultIterator(iterators, concatIterators); } success = true; @@ -322,70 +506,79 @@ public class ParallelIterators extends ExplainTable implements ResultIterators { } } - private void submitWork(final UUID scanId, List<KeyRange> splits, - List<Pair<KeyRange,Future<PeekingResultIterator>>> futures) { + private static final class ScanLocation { + private final int outerListIndex; + private final int innerListIndex; + private final Scan scan; + public ScanLocation(Scan scan, int outerListIndex, int innerListIndex) { + this.outerListIndex = outerListIndex; + this.innerListIndex = innerListIndex; + this.scan = scan; + } + public int getOuterListIndex() { + return outerListIndex; + } + public int getInnerListIndex() { + return innerListIndex; + } + public Scan getScan() { + return scan; + } + } + private void submitWork(final UUID scanId, List<List<Scan>> nestedScans, + List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures) { final ConnectionQueryServices services = context.getConnection().getQueryServices(); ExecutorService executor = services.getExecutor(); - final boolean localIndex = this.tableRef.getTable().getType() == PTableType.INDEX && this.tableRef.getTable().getIndexType() == IndexType.LOCAL; - for (final KeyRange split : splits) { - final Scan splitScan = ScanUtil.newScan(context.getScan()); - // Intersect with existing start/stop key if the table is salted - // If not salted, we've already intersected it. If salted, we need - // to wait until now to intersect, as we're running parallel scans - // on all the possible regions here. - if (tableRef.getTable().getBucketNum() != null) { - KeyRange minMaxRange = context.getMinMaxRange(); - if (minMaxRange != null) { - // Add salt byte based on current split, as minMaxRange won't have it - minMaxRange = SaltingUtil.addSaltByte(split.getLowerRange(), minMaxRange); - // FIXME: seems like this should be possible when we set the scan start/stop - // in StatementContext.setScanRanges(). If it doesn't intersect the range for - // one salt byte, I don't see how it could intersect it with any of them. - if (!ScanUtil.intersectScanRange(splitScan, minMaxRange.getLowerRange(), minMaxRange.getUpperRange())) { - continue; // Skip this chunk if no intersection based on minMaxRange - } - } - } else if (localIndex) { - // Used to detect stale region boundary information on server side - splitScan.setAttribute(EXPECTED_UPPER_REGION_KEY, split.getUpperRange()); - if (splitScan.getStartRow().length != 0 || splitScan.getStopRow().length != 0) { - SaltingUtil.addRegionStartKeyToScanStartAndStopRows(split.getLowerRange(),split.getUpperRange(), - splitScan); - } - } - if (ScanUtil.intersectScanRange(splitScan, split.getLowerRange(), split.getUpperRange(), this.context.getScanRanges().useSkipScanFilter())) { - Future<PeekingResultIterator> future = - executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() { + // Pre-populate nestedFutures lists so that we can shuffle the scans + // and add the future to the right nested list. By shuffling the scans + // we get better utilization of the cluster since our thread executor + // will spray the scans across machines as opposed to targeting a + // single one since the scans are in row key order. + List<ScanLocation> scanLocations = Lists.newArrayListWithExpectedSize(splits.size()); + for (int i = 0; i < nestedScans.size(); i++) { + List<Pair<Scan,Future<PeekingResultIterator>>> futures = Lists.newArrayListWithExpectedSize(scans.size()); + nestedFutures.add(futures); + for (int j = 0; j < nestedScans.get(i).size(); j++) { + Scan scan = nestedScans.get(i).get(j); + scanLocations.add(new ScanLocation(scan, i, j)); + futures.add(null); // placeholder + } + } + Collections.shuffle(scanLocations); + for (ScanLocation scanLocation : scanLocations) { + final Scan scan = scanLocation.getScan(); + Future<PeekingResultIterator> future = + executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() { - @Override - public PeekingResultIterator call() throws Exception { - long startTime = System.currentTimeMillis(); - ResultIterator scanner = new TableResultIterator(context, tableRef, splitScan); - if (logger.isDebugEnabled()) { - logger.debug(LogUtil.addCustomAnnotations("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + splitScan, ScanUtil.getCustomAnnotations(splitScan))); - } - return iteratorFactory.newIterator(context, scanner, splitScan); + @Override + public PeekingResultIterator call() throws Exception { + long startTime = System.currentTimeMillis(); + ResultIterator scanner = new TableResultIterator(context, tableRef, scan); + if (logger.isDebugEnabled()) { + logger.debug(LogUtil.addCustomAnnotations("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + scan, ScanUtil.getCustomAnnotations(scan))); } + return iteratorFactory.newIterator(context, scanner, scan); + } - /** - * Defines the grouping for round robin behavior. All threads spawned to process - * this scan will be grouped together and time sliced with other simultaneously - * executing parallel scans. - */ - @Override - public Object getJobId() { - return ParallelIterators.this; - } - }, "Parallel scanner for table: " + tableRef.getTable().getName().getString())); - futures.add(new Pair<KeyRange,Future<PeekingResultIterator>>(split,future)); - } + /** + * Defines the grouping for round robin behavior. All threads spawned to process + * this scan will be grouped together and time sliced with other simultaneously + * executing parallel scans. + */ + @Override + public Object getJobId() { + return ParallelIterators.this; + } + }, "Parallel scanner for table: " + tableRef.getTable().getName().getString())); + // Add our future in the right place so that we can concatenate the + // results of the inner futures versus merge sorting across all of them. + nestedFutures.get(scanLocation.getOuterListIndex()).set(scanLocation.getInnerListIndex(), new Pair<Scan,Future<PeekingResultIterator>>(scan,future)); } - } @Override public int size() { - return this.splits.size(); + return this.scans.size(); } @Override @@ -397,6 +590,6 @@ public class ParallelIterators extends ExplainTable implements ResultIterators { @Override public String toString() { - return "ParallelIterators [splits=" + splits + "]"; + return "ParallelIterators [scans=" + scans + "]"; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/a5d07cc0/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java index fc79173..f0a9d88 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java @@ -264,7 +264,7 @@ public class ScanUtil { private static byte[] getKey(RowKeySchema schema, List<List<KeyRange>> slots, int[] slotSpan, Bound bound) { if (slots.isEmpty()) { - return null; + return KeyRange.UNBOUND; } int[] position = new int[slots.size()]; int maxLength = 0; @@ -276,7 +276,7 @@ public class ScanUtil { byte[] key = new byte[maxLength]; int length = setKey(schema, slots, slotSpan, position, bound, key, 0, 0, position.length); if (length == 0) { - return null; + return KeyRange.UNBOUND; } if (length == maxLength) { return key;