Repository: phoenix
Updated Branches:
  refs/heads/master 1fe4af34f -> b5971dae6


PHOENIX-1186 Pass scan for parallel chunk of work through to 
ParallelIteratorFactory


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/5c0a08e5
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/5c0a08e5
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/5c0a08e5

Branch: refs/heads/master
Commit: 5c0a08e523a1f66d25c87af3a1fb396c36dc4249
Parents: 1fe4af3
Author: James Taylor <jamestay...@apache.org>
Authored: Tue Aug 19 16:19:07 2014 -0700
Committer: James Taylor <jamestay...@apache.org>
Committed: Tue Aug 19 18:39:34 2014 -0700

----------------------------------------------------------------------
 .../MutatingParallelIteratorFactory.java        |  3 ++-
 .../apache/phoenix/execute/AggregatePlan.java   |  9 +++++----
 .../phoenix/iterate/ChunkedResultIterator.java  | 21 ++++++++------------
 .../phoenix/iterate/ParallelIterators.java      |  4 ++--
 .../phoenix/iterate/SpoolingResultIterator.java |  3 ++-
 5 files changed, 19 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/5c0a08e5/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
index fbfce29..df91b1d 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
@@ -26,6 +26,7 @@ import java.sql.SQLException;
 import java.util.List;
 
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
 import org.apache.phoenix.iterate.PeekingResultIterator;
@@ -58,7 +59,7 @@ public abstract class MutatingParallelIteratorFactory 
implements ParallelIterato
     abstract protected MutationState mutate(StatementContext context, 
ResultIterator iterator, PhoenixConnection connection) throws SQLException;
     
     @Override
-    public PeekingResultIterator newIterator(StatementContext context, 
ResultIterator iterator) throws SQLException {
+    public PeekingResultIterator newIterator(StatementContext context, 
ResultIterator iterator, Scan scan) throws SQLException {
         final PhoenixConnection connection = new 
PhoenixConnection(this.connection);
         MutationState state = mutate(context, iterator, connection);
         long totalRowCount = state.getUpdateCount();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5c0a08e5/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 67c7bb7..d45e036 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -22,6 +22,7 @@ import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.compile.RowProjector;
@@ -94,7 +95,7 @@ public class AggregatePlan extends BasicQueryPlan {
             this.services = services;
         }
         @Override
-        public PeekingResultIterator newIterator(StatementContext context, 
ResultIterator scanner) throws SQLException {
+        public PeekingResultIterator newIterator(StatementContext context, 
ResultIterator scanner, Scan scan) throws SQLException {
             Expression expression = RowKeyExpression.INSTANCE;
             OrderByExpression orderByExpression = new 
OrderByExpression(expression, false, true);
             int threshold = 
services.getProps().getInt(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
@@ -111,9 +112,9 @@ public class AggregatePlan extends BasicQueryPlan {
             this.outerFactory = outerFactory;
         }
         @Override
-        public PeekingResultIterator newIterator(StatementContext context, 
ResultIterator scanner) throws SQLException {
-            PeekingResultIterator iterator = innerFactory.newIterator(context, 
scanner);
-            return outerFactory.newIterator(context, iterator);
+        public PeekingResultIterator newIterator(StatementContext context, 
ResultIterator scanner, Scan scan) throws SQLException {
+            PeekingResultIterator iterator = innerFactory.newIterator(context, 
scanner, scan);
+            return outerFactory.newIterator(context, iterator, scan);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5c0a08e5/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
index cfaca84..38e91bd 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
@@ -18,7 +18,6 @@
 
 package org.apache.phoenix.iterate;
 
-import java.io.IOException;
 import java.sql.SQLException;
 import java.util.List;
 
@@ -26,11 +25,11 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.exception.PhoenixIOException;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ScanUtil;
 
 /**
  * {@code PeekingResultIterator} implementation that loads data in chunks. 
This is intended for
@@ -58,9 +57,9 @@ public class ChunkedResultIterator implements 
PeekingResultIterator {
         }
 
         @Override
-        public PeekingResultIterator newIterator(StatementContext context, 
ResultIterator scanner) throws SQLException {
+        public PeekingResultIterator newIterator(StatementContext context, 
ResultIterator scanner, Scan scan) throws SQLException {
             scanner.close(); //close the iterator since we don't need it 
anymore.
-            return new ChunkedResultIterator(delegateFactory, context, 
tableRef,
+            return new ChunkedResultIterator(delegateFactory, context, 
tableRef, scan,
                     
context.getConnection().getQueryServices().getProps().getLong(
                                         QueryServices.SCAN_RESULT_CHUNK_SIZE,
                                         
QueryServicesOptions.DEFAULT_SCAN_RESULT_CHUNK_SIZE));
@@ -68,11 +67,11 @@ public class ChunkedResultIterator implements 
PeekingResultIterator {
     }
 
     public ChunkedResultIterator(ParallelIterators.ParallelIteratorFactory 
delegateIteratorFactory,
-            StatementContext context, TableRef tableRef, long chunkSize) {
+            StatementContext context, TableRef tableRef, Scan scan, long 
chunkSize) {
         this.delegateIteratorFactory = delegateIteratorFactory;
         this.context = context;
         this.tableRef = tableRef;
-        this.scan = context.getScan();
+        this.scan = scan;
         this.chunkSize = chunkSize;
     }
 
@@ -105,18 +104,14 @@ public class ChunkedResultIterator implements 
PeekingResultIterator {
         if (resultIterator == null) {
             singleChunkResultIterator = new SingleChunkResultIterator(
                     new TableResultIterator(context, tableRef, scan), 
chunkSize);
-            resultIterator = delegateIteratorFactory.newIterator(context, 
singleChunkResultIterator);
+            resultIterator = delegateIteratorFactory.newIterator(context, 
singleChunkResultIterator, scan);
         } else if (resultIterator.peek() == null && 
!singleChunkResultIterator.isEndOfStreamReached()) {
             singleChunkResultIterator.close();
-            try {
-                this.scan = new Scan(scan);
-            } catch (IOException e) {
-                throw new PhoenixIOException(e);
-            }
+            scan = ScanUtil.newScan(scan);
             scan.setStartRow(Bytes.add(singleChunkResultIterator.getLastKey(), 
new byte[]{0}));
             singleChunkResultIterator = new SingleChunkResultIterator(
                     new TableResultIterator(context, tableRef, scan), 
chunkSize);
-            resultIterator = delegateIteratorFactory.newIterator(context, 
singleChunkResultIterator);
+            resultIterator = delegateIteratorFactory.newIterator(context, 
singleChunkResultIterator, scan);
         }
         return resultIterator;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5c0a08e5/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index eb2cf71..687453f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -90,7 +90,7 @@ public class ParallelIterators extends ExplainTable 
implements ResultIterators {
     private final ParallelIteratorFactory iteratorFactory;
     
     public static interface ParallelIteratorFactory {
-        PeekingResultIterator newIterator(StatementContext context, 
ResultIterator scanner) throws SQLException;
+        PeekingResultIterator newIterator(StatementContext context, 
ResultIterator scanner, Scan scan) throws SQLException;
     }
 
     private static final int DEFAULT_THREAD_TIMEOUT_MS = 60000; // 1min
@@ -366,7 +366,7 @@ public class ParallelIterators extends ExplainTable 
implements ResultIterators {
                         if (logger.isDebugEnabled()) {
                             logger.debug("Id: " + scanId + ", Time: " + 
(System.currentTimeMillis() - startTime) + "ms, Scan: " + splitScan);
                         }
-                        return iteratorFactory.newIterator(scanContext, 
scanner);
+                        return iteratorFactory.newIterator(scanContext, 
scanner, splitScan);
                     }
 
                     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5c0a08e5/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
index 1a256de..4672657 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
@@ -28,6 +28,7 @@ import java.sql.SQLException;
 import java.util.List;
 
 import org.apache.commons.io.output.DeferredFileOutputStream;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.compile.StatementContext;
@@ -63,7 +64,7 @@ public class SpoolingResultIterator implements 
PeekingResultIterator {
             this.services = services;
         }
         @Override
-        public PeekingResultIterator newIterator(StatementContext context, 
ResultIterator scanner) throws SQLException {
+        public PeekingResultIterator newIterator(StatementContext context, 
ResultIterator scanner, Scan scan) throws SQLException {
             return new SpoolingResultIterator(scanner, services);
         }
         

Reply via email to