PHOENIX-1186 Pass scan for parallel chunk of work through to ParallelIteratorFactory
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b5971dae Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b5971dae Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b5971dae Branch: refs/heads/master Commit: b5971dae61b36f058d37dbc699548da63a068e4e Parents: 5c0a08e Author: James Taylor <jamestay...@apache.org> Authored: Tue Aug 19 18:35:07 2014 -0700 Committer: James Taylor <jamestay...@apache.org> Committed: Tue Aug 19 18:40:01 2014 -0700 ---------------------------------------------------------------------- .../phoenix/compile/StatementContext.java | 24 -------------------- .../phoenix/iterate/ChunkedResultIterator.java | 6 +++++ .../phoenix/iterate/ParallelIterators.java | 5 ++-- 3 files changed, 8 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5971dae/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java index 5bebfd8..1c75527 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java @@ -102,30 +102,6 @@ public class StatementContext { } /** - * Copy constructor where an altered scan can be set. - * - * @param stmtContext the {@code StatementContext} to be copied - * @param scan the customized scan - */ - public StatementContext(StatementContext stmtContext, Scan scan) { - this.statement = stmtContext.statement; - this.resolver = stmtContext.resolver; - this.scan = scan; - this.sequences = stmtContext.sequences; - this.binds = stmtContext.binds; - this.aggregates = stmtContext.aggregates; - this.expressions = stmtContext.expressions; - this.dateFormat = stmtContext.dateFormat; - this.dateFormatter = stmtContext.dateFormatter; - this.dateParser = stmtContext.dateParser; - this.numberFormat = stmtContext.numberFormat; - this.tempPtr = new ImmutableBytesWritable(); - this.currentTable = stmtContext.currentTable; - this.whereConditionColumns = stmtContext.whereConditionColumns; - this.dataColumns = stmtContext.getDataColumnsMap(); - } - - /** * build map from dataColumn to what will be its position in single KeyValue value bytes * returned from the coprocessor that joins from the index row back to the data row. * @param column http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5971dae/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java index 38e91bd..d7fbe79 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java @@ -30,12 +30,15 @@ import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.ScanUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * {@code PeekingResultIterator} implementation that loads data in chunks. This is intended for * basic scan plans, to avoid loading large quantities of data from HBase in one go. */ public class ChunkedResultIterator implements PeekingResultIterator { + private static final Logger logger = LoggerFactory.getLogger(ChunkedResultIterator.class); private final ParallelIterators.ParallelIteratorFactory delegateIteratorFactory; private SingleChunkResultIterator singleChunkResultIterator; @@ -59,6 +62,7 @@ public class ChunkedResultIterator implements PeekingResultIterator { @Override public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException { scanner.close(); //close the iterator since we don't need it anymore. + if (logger.isDebugEnabled()) logger.debug("ChunkedResultIteratorFactory.newIterator over " + tableRef.getTable().getName().getString() + " with " + scan); return new ChunkedResultIterator(delegateFactory, context, tableRef, scan, context.getConnection().getQueryServices().getProps().getLong( QueryServices.SCAN_RESULT_CHUNK_SIZE, @@ -102,6 +106,7 @@ public class ChunkedResultIterator implements PeekingResultIterator { private PeekingResultIterator getResultIterator() throws SQLException { if (resultIterator == null) { + if (logger.isDebugEnabled()) logger.debug("Get first chunked result iterator over " + tableRef.getTable().getName().getString() + " with " + scan); singleChunkResultIterator = new SingleChunkResultIterator( new TableResultIterator(context, tableRef, scan), chunkSize); resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan); @@ -109,6 +114,7 @@ public class ChunkedResultIterator implements PeekingResultIterator { singleChunkResultIterator.close(); scan = ScanUtil.newScan(scan); scan.setStartRow(Bytes.add(singleChunkResultIterator.getLastKey(), new byte[]{0})); + if (logger.isDebugEnabled()) logger.debug("Get next chunked result iterator over " + tableRef.getTable().getName().getString() + " with " + scan); singleChunkResultIterator = new SingleChunkResultIterator( new TableResultIterator(context, tableRef, scan), chunkSize); resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan); http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5971dae/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java index 687453f..3d03ddc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java @@ -360,13 +360,12 @@ public class ParallelIterators extends ExplainTable implements ResultIterators { @Override public PeekingResultIterator call() throws Exception { - StatementContext scanContext = new StatementContext(context, splitScan); long startTime = System.currentTimeMillis(); - ResultIterator scanner = new TableResultIterator(scanContext, tableRef, splitScan); + ResultIterator scanner = new TableResultIterator(context, tableRef, splitScan); if (logger.isDebugEnabled()) { logger.debug("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + splitScan); } - return iteratorFactory.newIterator(scanContext, scanner, splitScan); + return iteratorFactory.newIterator(context, scanner, splitScan); } /**