Repository: phoenix Updated Branches: refs/heads/master 1fe4af34f -> b5971dae6
PHOENIX-1186 Pass scan for parallel chunk of work through to ParallelIteratorFactory Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/5c0a08e5 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/5c0a08e5 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/5c0a08e5 Branch: refs/heads/master Commit: 5c0a08e523a1f66d25c87af3a1fb396c36dc4249 Parents: 1fe4af3 Author: James Taylor <jamestay...@apache.org> Authored: Tue Aug 19 16:19:07 2014 -0700 Committer: James Taylor <jamestay...@apache.org> Committed: Tue Aug 19 18:39:34 2014 -0700 ---------------------------------------------------------------------- .../MutatingParallelIteratorFactory.java | 3 ++- .../apache/phoenix/execute/AggregatePlan.java | 9 +++++---- .../phoenix/iterate/ChunkedResultIterator.java | 21 ++++++++------------ .../phoenix/iterate/ParallelIterators.java | 4 ++-- .../phoenix/iterate/SpoolingResultIterator.java | 3 ++- 5 files changed, 19 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/5c0a08e5/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java index fbfce29..df91b1d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java @@ -26,6 +26,7 @@ import java.sql.SQLException; import java.util.List; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Scan; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory; import org.apache.phoenix.iterate.PeekingResultIterator; @@ -58,7 +59,7 @@ public abstract class MutatingParallelIteratorFactory implements ParallelIterato abstract protected MutationState mutate(StatementContext context, ResultIterator iterator, PhoenixConnection connection) throws SQLException; @Override - public PeekingResultIterator newIterator(StatementContext context, ResultIterator iterator) throws SQLException { + public PeekingResultIterator newIterator(StatementContext context, ResultIterator iterator, Scan scan) throws SQLException { final PhoenixConnection connection = new PhoenixConnection(this.connection); MutationState state = mutate(context, iterator, connection); long totalRowCount = state.getUpdateCount(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/5c0a08e5/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java index 67c7bb7..d45e036 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java @@ -22,6 +22,7 @@ import java.sql.SQLException; import java.util.Collections; import java.util.List; +import org.apache.hadoop.hbase.client.Scan; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; import org.apache.phoenix.compile.RowProjector; @@ -94,7 +95,7 @@ public class AggregatePlan extends BasicQueryPlan { this.services = services; } @Override - public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner) throws SQLException { + public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException { Expression expression = RowKeyExpression.INSTANCE; OrderByExpression orderByExpression = new OrderByExpression(expression, false, true); int threshold = services.getProps().getInt(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES); @@ -111,9 +112,9 @@ public class AggregatePlan extends BasicQueryPlan { this.outerFactory = outerFactory; } @Override - public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner) throws SQLException { - PeekingResultIterator iterator = innerFactory.newIterator(context, scanner); - return outerFactory.newIterator(context, iterator); + public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException { + PeekingResultIterator iterator = innerFactory.newIterator(context, scanner, scan); + return outerFactory.newIterator(context, iterator, scan); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/5c0a08e5/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java index cfaca84..38e91bd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java @@ -18,7 +18,6 @@ package org.apache.phoenix.iterate; -import java.io.IOException; import java.sql.SQLException; import java.util.List; @@ -26,11 +25,11 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.compile.StatementContext; -import org.apache.phoenix.exception.PhoenixIOException; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.ScanUtil; /** * {@code PeekingResultIterator} implementation that loads data in chunks. This is intended for @@ -58,9 +57,9 @@ public class ChunkedResultIterator implements PeekingResultIterator { } @Override - public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner) throws SQLException { + public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException { scanner.close(); //close the iterator since we don't need it anymore. - return new ChunkedResultIterator(delegateFactory, context, tableRef, + return new ChunkedResultIterator(delegateFactory, context, tableRef, scan, context.getConnection().getQueryServices().getProps().getLong( QueryServices.SCAN_RESULT_CHUNK_SIZE, QueryServicesOptions.DEFAULT_SCAN_RESULT_CHUNK_SIZE)); @@ -68,11 +67,11 @@ public class ChunkedResultIterator implements PeekingResultIterator { } public ChunkedResultIterator(ParallelIterators.ParallelIteratorFactory delegateIteratorFactory, - StatementContext context, TableRef tableRef, long chunkSize) { + StatementContext context, TableRef tableRef, Scan scan, long chunkSize) { this.delegateIteratorFactory = delegateIteratorFactory; this.context = context; this.tableRef = tableRef; - this.scan = context.getScan(); + this.scan = scan; this.chunkSize = chunkSize; } @@ -105,18 +104,14 @@ public class ChunkedResultIterator implements PeekingResultIterator { if (resultIterator == null) { singleChunkResultIterator = new SingleChunkResultIterator( new TableResultIterator(context, tableRef, scan), chunkSize); - resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator); + resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan); } else if (resultIterator.peek() == null && !singleChunkResultIterator.isEndOfStreamReached()) { singleChunkResultIterator.close(); - try { - this.scan = new Scan(scan); - } catch (IOException e) { - throw new PhoenixIOException(e); - } + scan = ScanUtil.newScan(scan); scan.setStartRow(Bytes.add(singleChunkResultIterator.getLastKey(), new byte[]{0})); singleChunkResultIterator = new SingleChunkResultIterator( new TableResultIterator(context, tableRef, scan), chunkSize); - resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator); + resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan); } return resultIterator; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/5c0a08e5/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java index eb2cf71..687453f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java @@ -90,7 +90,7 @@ public class ParallelIterators extends ExplainTable implements ResultIterators { private final ParallelIteratorFactory iteratorFactory; public static interface ParallelIteratorFactory { - PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner) throws SQLException; + PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException; } private static final int DEFAULT_THREAD_TIMEOUT_MS = 60000; // 1min @@ -366,7 +366,7 @@ public class ParallelIterators extends ExplainTable implements ResultIterators { if (logger.isDebugEnabled()) { logger.debug("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + splitScan); } - return iteratorFactory.newIterator(scanContext, scanner); + return iteratorFactory.newIterator(scanContext, scanner, splitScan); } /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/5c0a08e5/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java index 1a256de..4672657 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java @@ -28,6 +28,7 @@ import java.sql.SQLException; import java.util.List; import org.apache.commons.io.output.DeferredFileOutputStream; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.compile.StatementContext; @@ -63,7 +64,7 @@ public class SpoolingResultIterator implements PeekingResultIterator { this.services = services; } @Override - public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner) throws SQLException { + public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException { return new SpoolingResultIterator(scanner, services); }