http://git-wip-us.apache.org/repos/asf/phoenix/blob/776eea9c/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 5670dae..b125ecc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -42,6 +42,7 @@ import 
org.apache.phoenix.iterate.FilterAggregatingResultIterator;
 import org.apache.phoenix.iterate.GroupedAggregatingResultIterator;
 import org.apache.phoenix.iterate.LimitingResultIterator;
 import org.apache.phoenix.iterate.MergeSortRowKeyResultIterator;
+import org.apache.phoenix.iterate.OffsetResultIterator;
 import org.apache.phoenix.iterate.OrderedAggregatingResultIterator;
 import org.apache.phoenix.iterate.OrderedResultIterator;
 import org.apache.phoenix.iterate.ParallelIteratorFactory;
@@ -63,8 +64,6 @@ import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.util.ScanUtil;
 
-
-
 /**
  *
  * Query plan for aggregating queries
@@ -78,18 +77,19 @@ public class AggregatePlan extends BaseQueryPlan {
     private List<KeyRange> splits;
     private List<List<Scan>> scans;
 
-    public AggregatePlan(
-            StatementContext context, FilterableStatement statement, TableRef 
table, RowProjector projector,
-            Integer limit, OrderBy orderBy, ParallelIteratorFactory 
parallelIteratorFactory, GroupBy groupBy,
-            Expression having) {
-        this(context, statement, table, projector, limit, orderBy, 
parallelIteratorFactory, groupBy, having, null);
+    public AggregatePlan(StatementContext context, FilterableStatement 
statement, TableRef table,
+            RowProjector projector, Integer limit, Integer offset, OrderBy 
orderBy,
+            ParallelIteratorFactory parallelIteratorFactory, GroupBy groupBy, 
Expression having) {
+        this(context, statement, table, projector, limit, offset, orderBy, 
parallelIteratorFactory, groupBy, having,
+                null);
     }
-    
-    private AggregatePlan(
-            StatementContext context, FilterableStatement statement, TableRef 
table, RowProjector projector,
-            Integer limit, OrderBy orderBy, ParallelIteratorFactory 
parallelIteratorFactory, GroupBy groupBy,
-            Expression having, Expression dynamicFilter) {
-        super(context, statement, table, projector, 
context.getBindManager().getParameterMetaData(), limit, orderBy, groupBy, 
parallelIteratorFactory, dynamicFilter);
+
+    private AggregatePlan(StatementContext context, FilterableStatement 
statement, TableRef table,
+            RowProjector projector, Integer limit, Integer offset, OrderBy 
orderBy,
+            ParallelIteratorFactory parallelIteratorFactory, GroupBy groupBy, 
Expression having,
+            Expression dynamicFilter) {
+        super(context, statement, table, projector, 
context.getBindManager().getParameterMetaData(), limit, offset,
+                orderBy, groupBy, parallelIteratorFactory, dynamicFilter);
         this.having = having;
         this.aggregators = context.getAggregationManager().getAggregators();
     }
@@ -195,12 +195,13 @@ public class AggregatePlan extends BaseQueryPlan {
                  *    order, so we can early exit, even when aggregate 
functions are used, as
                  *    the rows in the group are contiguous.
                  */
-                
context.getScan().setAttribute(BaseScannerRegionObserver.GROUP_BY_LIMIT, 
PInteger.INSTANCE.toBytes(limit));
+                
context.getScan().setAttribute(BaseScannerRegionObserver.GROUP_BY_LIMIT,
+                        PInteger.INSTANCE.toBytes(limit + (offset == null ? 0 
: offset)));
             }
         }
-        BaseResultIterators iterators = 
statement.getHint().hasHint(HintNode.Hint.SERIAL) ?
-                new SerialIterators(this, null, wrapParallelIteratorFactory(), 
scanGrouper) :
-                new ParallelIterators(this, null, 
wrapParallelIteratorFactory());
+        BaseResultIterators iterators = 
statement.getHint().hasHint(HintNode.Hint.SERIAL)
+                ? new SerialIterators(this, null, null, 
wrapParallelIteratorFactory(), scanGrouper)
+                : new ParallelIterators(this, null, 
wrapParallelIteratorFactory());
 
         splits = iterators.getSplits();
         scans = iterators.getScans();
@@ -228,13 +229,17 @@ public class AggregatePlan extends BaseQueryPlan {
 
         ResultIterator resultScanner = aggResultIterator;
         if (orderBy.getOrderByExpressions().isEmpty()) {
+            if (offset != null) {
+                resultScanner = new OffsetResultIterator(aggResultIterator, 
offset);
+            }
             if (limit != null) {
-                resultScanner = new LimitingResultIterator(aggResultIterator, 
limit);
+                resultScanner = new LimitingResultIterator(resultScanner, 
limit);
             }
         } else {
             int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt(
                     QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
-            resultScanner = new 
OrderedAggregatingResultIterator(aggResultIterator, 
orderBy.getOrderByExpressions(), thresholdBytes, limit);
+            resultScanner = new 
OrderedAggregatingResultIterator(aggResultIterator, 
orderBy.getOrderByExpressions(),
+                    thresholdBytes, limit, offset);
         }
         if (context.getSequenceManager().getSequenceCount() > 0) {
             resultScanner = new SequenceResultIterator(resultScanner, 
context.getSequenceManager());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/776eea9c/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index 0ee70ba..cedd23e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -99,6 +99,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
     protected final RowProjector projection;
     protected final ParameterMetaData paramMetaData;
     protected final Integer limit;
+    protected final Integer offset;
     protected final OrderBy orderBy;
     protected final GroupBy groupBy;
     protected final ParallelIteratorFactory parallelIteratorFactory;    
@@ -114,7 +115,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
 
     protected BaseQueryPlan(
             StatementContext context, FilterableStatement statement, TableRef 
table,
-            RowProjector projection, ParameterMetaData paramMetaData, Integer 
limit, OrderBy orderBy,
+            RowProjector projection, ParameterMetaData paramMetaData, Integer 
limit, Integer offset, OrderBy orderBy,
             GroupBy groupBy, ParallelIteratorFactory parallelIteratorFactory,
             Expression dynamicFilter) {
         this.context = context;
@@ -124,6 +125,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
         this.projection = projection;
         this.paramMetaData = paramMetaData;
         this.limit = limit;
+        this.offset = offset;
         this.orderBy = orderBy;
         this.groupBy = groupBy;
         this.parallelIteratorFactory = parallelIteratorFactory;
@@ -175,6 +177,11 @@ public abstract class BaseQueryPlan implements QueryPlan {
     public Integer getLimit() {
         return limit;
     }
+    
+    @Override
+    public Integer getOffset() {
+        return offset;
+    }
 
     @Override
     public RowProjector getProjector() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/776eea9c/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
index c1ef67d..f4e374e 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
@@ -17,7 +17,7 @@
  */
 package org.apache.phoenix.execute;
 
-import static org.apache.phoenix.query.QueryConstants.*;
+import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
 
 import java.io.IOException;
 import java.sql.SQLException;
@@ -46,6 +46,7 @@ import org.apache.phoenix.iterate.FilterResultIterator;
 import org.apache.phoenix.iterate.GroupedAggregatingResultIterator;
 import org.apache.phoenix.iterate.LimitingResultIterator;
 import org.apache.phoenix.iterate.LookAheadResultIterator;
+import org.apache.phoenix.iterate.OffsetResultIterator;
 import org.apache.phoenix.iterate.OrderedAggregatingResultIterator;
 import org.apache.phoenix.iterate.OrderedResultIterator;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
@@ -70,8 +71,8 @@ public class ClientAggregatePlan extends ClientProcessingPlan 
{
     private final Aggregators clientAggregators;
     
     public ClientAggregatePlan(StatementContext context, FilterableStatement 
statement, TableRef table, RowProjector projector,
-            Integer limit, Expression where, OrderBy orderBy, GroupBy groupBy, 
Expression having, QueryPlan delegate) {
-        super(context, statement, table, projector, limit, where, orderBy, 
delegate);
+            Integer limit, Integer offset, Expression where, OrderBy orderBy, 
GroupBy groupBy, Expression having, QueryPlan delegate) {
+        super(context, statement, table, projector, limit, offset, where, 
orderBy, delegate);
         this.groupBy = groupBy;
         this.having = having;
         this.serverAggregators =
@@ -100,7 +101,7 @@ public class ClientAggregatePlan extends 
ClientProcessingPlan {
                 for (Expression keyExpression : keyExpressions) {
                     keyExpressionOrderBy.add(new 
OrderByExpression(keyExpression, false, true));
                 }
-                iterator = new OrderedResultIterator(iterator, 
keyExpressionOrderBy, thresholdBytes, null, 
projector.getEstimatedRowByteSize());
+                iterator = new OrderedResultIterator(iterator, 
keyExpressionOrderBy, thresholdBytes, null, null, 
projector.getEstimatedRowByteSize());
             }
             aggResultIterator = new 
ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), 
serverAggregators, groupBy.getKeyExpressions());
             aggResultIterator = new 
GroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator),
 clientAggregators);
@@ -116,13 +117,16 @@ public class ClientAggregatePlan extends 
ClientProcessingPlan {
 
         ResultIterator resultScanner = aggResultIterator;
         if (orderBy.getOrderByExpressions().isEmpty()) {
+            if (offset != null) {
+                resultScanner = new OffsetResultIterator(resultScanner, 
offset);
+            }
             if (limit != null) {
-                resultScanner = new LimitingResultIterator(aggResultIterator, 
limit);
+                resultScanner = new LimitingResultIterator(resultScanner, 
limit);
             }
         } else {
             int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt(
                     QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
-            resultScanner = new 
OrderedAggregatingResultIterator(aggResultIterator, 
orderBy.getOrderByExpressions(), thresholdBytes, limit);
+            resultScanner = new 
OrderedAggregatingResultIterator(aggResultIterator, 
orderBy.getOrderByExpressions(), thresholdBytes, limit, offset);
         }
         if (context.getSequenceManager().getSequenceCount() > 0) {
             resultScanner = new SequenceResultIterator(resultScanner, 
context.getSequenceManager());
@@ -151,6 +155,9 @@ public class ClientAggregatePlan extends 
ClientProcessingPlan {
         if (statement.isDistinct() && statement.isAggregate()) {
             planSteps.add("CLIENT DISTINCT ON " + projector.toString());
         }
+        if (offset != null) {
+            planSteps.add("CLIENT OFFSET " + offset);
+        }
         if (orderBy.getOrderByExpressions().isEmpty()) {
             if (limit != null) {
                 planSteps.add("CLIENT " + limit + " ROW LIMIT");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/776eea9c/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
index b189933..2cd5237 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
@@ -39,11 +39,11 @@ public abstract class ClientProcessingPlan extends 
DelegateQueryPlan {
     protected final TableRef table;
     protected final RowProjector projector;
     protected final Integer limit;
+    protected final Integer offset;
     protected final Expression where;
     protected final OrderBy orderBy;
-
     public ClientProcessingPlan(StatementContext context, FilterableStatement 
statement, TableRef table, 
-            RowProjector projector, Integer limit, Expression where, OrderBy 
orderBy, QueryPlan delegate) {
+            RowProjector projector, Integer limit, Integer offset, Expression 
where, OrderBy orderBy, QueryPlan delegate) {
         super(delegate);
         this.context = context;
         this.statement = statement;
@@ -52,6 +52,7 @@ public abstract class ClientProcessingPlan extends 
DelegateQueryPlan {
         this.limit = limit;
         this.where = where;
         this.orderBy = orderBy;
+        this.offset = offset;
     }
     
     @Override
@@ -73,6 +74,11 @@ public abstract class ClientProcessingPlan extends 
DelegateQueryPlan {
     public Integer getLimit() {
         return limit;
     }
+    
+    @Override
+    public Integer getOffset() {
+        return offset;
+    }
 
     @Override
     public OrderBy getOrderBy() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/776eea9c/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
index 4bf1889..003c995 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
@@ -28,6 +28,7 @@ import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.iterate.FilterResultIterator;
 import org.apache.phoenix.iterate.LimitingResultIterator;
+import org.apache.phoenix.iterate.OffsetResultIterator;
 import org.apache.phoenix.iterate.OrderedResultIterator;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
@@ -41,12 +42,10 @@ import com.google.common.collect.Lists;
 
 public class ClientScanPlan extends ClientProcessingPlan {
 
-    public ClientScanPlan(StatementContext context,
-            FilterableStatement statement, TableRef table,
-            RowProjector projector, Integer limit, Expression where,
-            OrderBy orderBy, QueryPlan delegate) {
-        super(context, statement, table, projector, limit, where, orderBy,
-                delegate);
+    public ClientScanPlan(StatementContext context, FilterableStatement 
statement, TableRef table,
+            RowProjector projector, Integer limit, Integer offset, Expression 
where, OrderBy orderBy,
+            QueryPlan delegate) {
+        super(context, statement, table, projector, limit, offset, where, 
orderBy, delegate);
     }
 
     @Override
@@ -59,9 +58,15 @@ public class ClientScanPlan extends ClientProcessingPlan {
         if (!orderBy.getOrderByExpressions().isEmpty()) { // TopN
             int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt(
                     QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
-            iterator = new OrderedResultIterator(iterator, 
orderBy.getOrderByExpressions(), thresholdBytes, limit, 
projector.getEstimatedRowByteSize());
-        } else if (limit != null) {
-            iterator = new LimitingResultIterator(iterator, limit);
+            iterator = new OrderedResultIterator(iterator, 
orderBy.getOrderByExpressions(), thresholdBytes, limit,
+                    offset, projector.getEstimatedRowByteSize());
+        } else {
+            if (offset != null) {
+                iterator = new OffsetResultIterator(iterator, offset);
+            }
+            if (limit != null) {
+                iterator = new LimitingResultIterator(iterator, limit);
+            }
         }
         
         if (context.getSequenceManager().getSequenceCount() > 0) {
@@ -78,9 +83,18 @@ public class ClientScanPlan extends ClientProcessingPlan {
             planSteps.add("CLIENT FILTER BY " + where.toString());
         }
         if (!orderBy.getOrderByExpressions().isEmpty()) {
-            planSteps.add("CLIENT" + (limit == null ? "" : " TOP " + limit + " 
ROW"  + (limit == 1 ? "" : "S"))  + " SORTED BY " + 
orderBy.getOrderByExpressions().toString());
-        } else if (limit != null) {
-            planSteps.add("CLIENT " + limit + " ROW LIMIT");
+            if (offset != null) {
+                planSteps.add("CLIENT OFFSET " + offset);
+            }
+            planSteps.add("CLIENT" + (limit == null ? "" : " TOP " + limit + " 
ROW" + (limit == 1 ? "" : "S"))
+                    + " SORTED BY " + 
orderBy.getOrderByExpressions().toString());
+        } else {
+            if (offset != null) {
+                planSteps.add("CLIENT OFFSET " + offset);
+            }
+            if (limit != null) {
+                planSteps.add("CLIENT " + limit + " ROW LIMIT");
+            }
         }
         if (context.getSequenceManager().getSequenceCount() > 0) {
             int nSequences = context.getSequenceManager().getSequenceCount();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/776eea9c/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
index 21b25d6..36b725e 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
@@ -37,7 +37,7 @@ import org.apache.phoenix.schema.TableRef;
 public class DegenerateQueryPlan extends BaseQueryPlan {
 
     public DegenerateQueryPlan(StatementContext context, FilterableStatement 
statement, TableRef table) {
-        super(context, statement, table, RowProjector.EMPTY_PROJECTOR, 
PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA, null, 
OrderBy.EMPTY_ORDER_BY, GroupBy.EMPTY_GROUP_BY, null, null);
+        super(context, statement, table, RowProjector.EMPTY_PROJECTOR, 
PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA, null,null, 
OrderBy.EMPTY_ORDER_BY, GroupBy.EMPTY_GROUP_BY, null, null);
         context.setScanRanges(ScanRanges.NOTHING);
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/776eea9c/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
index 56e0ccd..8f0d224 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
@@ -120,4 +120,8 @@ public abstract class DelegateQueryPlan implements 
QueryPlan {
                return delegate.getOperation();
        }
        
+       @Override
+    public Integer getOffset() {
+        return delegate.getOffset();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/776eea9c/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
index ab13e6c..fe767d9 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
@@ -43,15 +43,15 @@ public class LiteralResultIterationPlan extends 
BaseQueryPlan {
 
     public LiteralResultIterationPlan(StatementContext context, 
             FilterableStatement statement, TableRef tableRef, RowProjector 
projection, 
-            Integer limit, OrderBy orderBy, ParallelIteratorFactory 
parallelIteratorFactory) {
+            Integer limit, Integer offset, OrderBy orderBy, 
ParallelIteratorFactory parallelIteratorFactory) {
         this(Collections.<Tuple> singletonList(new 
SingleKeyValueTuple(KeyValue.LOWESTKEY)), 
-                context, statement, tableRef, projection, limit, orderBy, 
parallelIteratorFactory);
+                context, statement, tableRef, projection, limit, offset, 
orderBy, parallelIteratorFactory);
     }
 
     public LiteralResultIterationPlan(Iterable<Tuple> tuples, StatementContext 
context, 
             FilterableStatement statement, TableRef tableRef, RowProjector 
projection, 
-            Integer limit, OrderBy orderBy, ParallelIteratorFactory 
parallelIteratorFactory) {
-        super(context, statement, tableRef, projection, 
context.getBindManager().getParameterMetaData(), limit, orderBy, 
GroupBy.EMPTY_GROUP_BY, parallelIteratorFactory, null);
+            Integer limit, Integer offset, OrderBy orderBy, 
ParallelIteratorFactory parallelIteratorFactory) {
+        super(context, statement, tableRef, projection, 
context.getBindManager().getParameterMetaData(), limit, offset, orderBy, 
GroupBy.EMPTY_GROUP_BY, parallelIteratorFactory, null);
         this.tuples = tuples;
     }
 
@@ -77,6 +77,7 @@ public class LiteralResultIterationPlan extends BaseQueryPlan 
{
             private final Iterator<Tuple> tupleIterator = tuples.iterator();
             private boolean closed = false;
             private int count = 0;
+            private int offsetCount = 0;
 
             @Override
             public void close() throws SQLException {
@@ -85,6 +86,10 @@ public class LiteralResultIterationPlan extends 
BaseQueryPlan {
 
             @Override
             public Tuple next() throws SQLException {
+                while (!this.closed && (offset != null && offsetCount < 
offset) && tupleIterator.hasNext()) {
+                    offsetCount++;
+                    tupleIterator.next();
+                }
                 if (!this.closed 
                         && (limit == null || count++ < limit)
                         && tupleIterator.hasNext()) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/776eea9c/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index 4a5cb83..93ae5d6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -37,6 +37,7 @@ import org.apache.phoenix.iterate.ConcatResultIterator;
 import org.apache.phoenix.iterate.LimitingResultIterator;
 import org.apache.phoenix.iterate.MergeSortRowKeyResultIterator;
 import org.apache.phoenix.iterate.MergeSortTopNResultIterator;
+import org.apache.phoenix.iterate.OffsetResultIterator;
 import org.apache.phoenix.iterate.ParallelIteratorFactory;
 import org.apache.phoenix.iterate.ParallelIterators;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
@@ -59,6 +60,7 @@ import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.stats.GuidePostsInfo;
 import org.apache.phoenix.schema.stats.StatisticsUtil;
 import org.apache.phoenix.util.LogUtil;
+import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.slf4j.Logger;
@@ -79,14 +81,14 @@ public class ScanPlan extends BaseQueryPlan {
     private List<List<Scan>> scans;
     private boolean allowPageFilter;
     
-    public ScanPlan(StatementContext context, FilterableStatement statement, 
TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, 
ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) 
throws SQLException {
-        this(context, statement, table, projector, limit, orderBy, 
parallelIteratorFactory, allowPageFilter, null);
+    public ScanPlan(StatementContext context, FilterableStatement statement, 
TableRef table, RowProjector projector, Integer limit, Integer offset, OrderBy 
orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean 
allowPageFilter) throws SQLException {
+        this(context, statement, table, projector, limit, offset, orderBy, 
parallelIteratorFactory, allowPageFilter, null);
     }
     
-    private ScanPlan(StatementContext context, FilterableStatement statement, 
TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, 
ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter, 
Expression dynamicFilter) throws SQLException {
-        super(context, statement, table, projector, 
context.getBindManager().getParameterMetaData(), limit, orderBy, 
GroupBy.EMPTY_GROUP_BY,
+    private ScanPlan(StatementContext context, FilterableStatement statement, 
TableRef table, RowProjector projector, Integer limit, Integer offset, OrderBy 
orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean 
allowPageFilter, Expression dynamicFilter) throws SQLException {
+        super(context, statement, table, projector, 
context.getBindManager().getParameterMetaData(), limit,offset, orderBy, 
GroupBy.EMPTY_GROUP_BY,
                 parallelIteratorFactory != null ? parallelIteratorFactory :
-                        buildResultIteratorFactory(context, statement, table, 
orderBy, limit, allowPageFilter), dynamicFilter);
+                        buildResultIteratorFactory(context, statement, table, 
orderBy, limit, offset, allowPageFilter), dynamicFilter);
         this.allowPageFilter = allowPageFilter;
         if (!orderBy.getOrderByExpressions().isEmpty()) { // TopN
             int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt(
@@ -96,7 +98,7 @@ public class ScanPlan extends BaseQueryPlan {
     }
 
     private static boolean isSerial(StatementContext context, 
FilterableStatement statement,
-            TableRef tableRef, OrderBy orderBy, Integer limit, boolean 
allowPageFilter) throws SQLException {
+            TableRef tableRef, OrderBy orderBy, Integer limit, Integer offset, 
boolean allowPageFilter) throws SQLException {
         if (statement.getHint().hasHint(HintNode.Hint.SERIAL)) {
             return true;
         }
@@ -142,13 +144,11 @@ public class ScanPlan extends BaseQueryPlan {
     }
     
     private static ParallelIteratorFactory 
buildResultIteratorFactory(StatementContext context, FilterableStatement 
statement,
-            TableRef table, OrderBy orderBy, Integer limit, boolean 
allowPageFilter) throws SQLException {
+            TableRef table, OrderBy orderBy, Integer limit,Integer offset, 
boolean allowPageFilter) throws SQLException {
 
-        if (isSerial(context, statement, table, orderBy, limit, 
allowPageFilter)
-                || ScanUtil.isRoundRobinPossible(orderBy, context)
-                || ScanUtil.isPacingScannersPossible(context)) {
-            return ParallelIteratorFactory.NOOP_FACTORY;
-        }
+        if ((isSerial(context, statement, table, orderBy, limit, offset, 
allowPageFilter)
+                || ScanUtil.isRoundRobinPossible(orderBy, context) || 
ScanUtil.isPacingScannersPossible(context))
+                && offset == null) { return 
ParallelIteratorFactory.NOOP_FACTORY; }
         ParallelIteratorFactory spoolingResultIteratorFactory =
                 new SpoolingResultIterator.SpoolingResultIteratorFactory(
                         context.getConnection().getQueryServices());
@@ -180,6 +180,13 @@ public class ScanPlan extends BaseQueryPlan {
             return scans;
     }
 
+    private static boolean isOffsetPossibleOnServer(StatementContext context, 
OrderBy orderBy, Integer offset,
+            boolean isSalted, IndexType indexType) {
+        return offset != null && orderBy.getOrderByExpressions().isEmpty()
+                && !((isSalted || indexType == IndexType.LOCAL)
+                        && ScanUtil.shouldRowsBeInRowKeyOrder(orderBy, 
context));
+    }
+
     @Override
     protected ResultIterator newIterator(ParallelScanGrouper scanGrouper) 
throws SQLException {
         // Set any scan attributes before creating the scanner, as it will be 
too late afterwards
@@ -193,21 +200,31 @@ public class ScanPlan extends BaseQueryPlan {
          * limit is provided, run query serially.
          */
         boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty();
-        boolean isSerial = isSerial(context, statement, tableRef, orderBy, 
limit, allowPageFilter);
+        boolean isSerial = isSerial(context, statement, tableRef, orderBy, 
limit, offset, allowPageFilter);
         Integer perScanLimit = !allowPageFilter || isOrdered ? null : limit;
+        if (perScanLimit != null) {
+            perScanLimit = QueryUtil.getOffsetLimit(perScanLimit, offset);
+        }
         BaseResultIterators iterators;
-        if (isSerial) {
-               iterators = new SerialIterators(this, perScanLimit, 
parallelIteratorFactory, scanGrouper);
+        boolean isOffsetOnServer = isOffsetPossibleOnServer(context, orderBy, 
offset, isSalted, table.getIndexType());
+        if (isOffsetOnServer) {
+            iterators = new SerialIterators(this, perScanLimit, offset, 
parallelIteratorFactory, scanGrouper);
+        } else if (isSerial) {
+            iterators = new SerialIterators(this, perScanLimit, null, 
parallelIteratorFactory, scanGrouper);
         } else {
-               iterators = new ParallelIterators(this, perScanLimit, 
parallelIteratorFactory, scanGrouper);
+            iterators = new ParallelIterators(this, perScanLimit, 
parallelIteratorFactory, scanGrouper);
         }
         splits = iterators.getSplits();
         scans = iterators.getScans();
         estimatedSize = iterators.getEstimatedByteCount();
         estimatedRows = iterators.getEstimatedRowCount();
-        
-        if (isOrdered) {
-            scanner = new MergeSortTopNResultIterator(iterators, limit, 
orderBy.getOrderByExpressions());
+        if (isOffsetOnServer) {
+            scanner = new ConcatResultIterator(iterators);
+            if (limit != null) {
+                scanner = new LimitingResultIterator(scanner, limit);
+            }
+        } else if (isOrdered) {
+            scanner = new MergeSortTopNResultIterator(iterators, limit, 
offset, orderBy.getOrderByExpressions());
         } else {
             if ((isSalted || table.getIndexType() == IndexType.LOCAL) && 
ScanUtil.shouldRowsBeInRowKeyOrder(orderBy, context)) {
                 /*
@@ -226,6 +243,9 @@ public class ScanPlan extends BaseQueryPlan {
             } else {
                 scanner = new ConcatResultIterator(iterators);
             }
+            if (offset != null) {
+                scanner = new OffsetResultIterator(scanner, offset);
+            }
             if (limit != null) {
                 scanner = new LimitingResultIterator(scanner, limit);
             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/776eea9c/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index 41cae79..e181e80 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -187,6 +187,11 @@ public class SortMergeJoinPlan implements QueryPlan {
     }
 
     @Override
+    public Integer getOffset() {
+        return null;
+    }
+
+    @Override
     public OrderBy getOrderBy() {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/776eea9c/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
index 3208913..808141e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
@@ -35,6 +35,7 @@ import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.iterate.ConcatResultIterator;
 import org.apache.phoenix.iterate.LimitingResultIterator;
 import org.apache.phoenix.iterate.MergeSortTopNResultIterator;
+import org.apache.phoenix.iterate.OffsetResultIterator;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.iterate.UnionResultIterators;
@@ -56,6 +57,7 @@ public class UnionPlan implements QueryPlan {
     private final OrderBy orderBy;
     private final StatementContext parentContext;
     private final Integer limit;
+    private final Integer offset;
     private final GroupBy groupBy;
     private final RowProjector projector;
     private final boolean isDegenerate;
@@ -63,7 +65,7 @@ public class UnionPlan implements QueryPlan {
     private UnionResultIterators iterators;
 
     public UnionPlan(StatementContext context, FilterableStatement statement, 
TableRef table, RowProjector projector,
-            Integer limit, OrderBy orderBy, GroupBy groupBy, List<QueryPlan> 
plans, ParameterMetaData paramMetaData) throws SQLException {
+            Integer limit, Integer offset, OrderBy orderBy, GroupBy groupBy, 
List<QueryPlan> plans, ParameterMetaData paramMetaData) throws SQLException {
         this.parentContext = context;
         this.statement = statement;
         this.tableRef = table;
@@ -72,6 +74,7 @@ public class UnionPlan implements QueryPlan {
         this.orderBy = orderBy;
         this.groupBy = groupBy;
         this.plans = plans;
+        this.offset= offset;
         this.paramMetaData = paramMetaData;
         boolean isDegen = true;
         for (QueryPlan plan : plans) {           
@@ -123,6 +126,11 @@ public class UnionPlan implements QueryPlan {
     }
 
     @Override
+    public Integer getOffset() {
+        return offset;
+    }
+
+    @Override
     public RowProjector getProjector() {
         return projector;
     }
@@ -143,9 +151,12 @@ public class UnionPlan implements QueryPlan {
         boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty();
 
         if (isOrdered) { // TopN
-            scanner = new MergeSortTopNResultIterator(iterators, limit, 
orderBy.getOrderByExpressions());
+            scanner = new MergeSortTopNResultIterator(iterators, limit, 
offset, orderBy.getOrderByExpressions());
         } else {
             scanner = new ConcatResultIterator(iterators);
+            if (offset != null) {
+                scanner = new OffsetResultIterator(scanner, offset);
+            }
             if (limit != null) {
                 scanner = new LimitingResultIterator(scanner, limit);
             }          

http://git-wip-us.apache.org/repos/asf/phoenix/blob/776eea9c/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 8d4b34b..0299f18 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -62,6 +62,7 @@ import 
org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.execute.ScanPlan;
 import org.apache.phoenix.filter.ColumnProjectionFilter;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.VersionUtil;
@@ -152,7 +153,7 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
         return true;
     }
     
-    private static void initializeScan(QueryPlan plan, Integer perScanLimit) {
+    private static void initializeScan(QueryPlan plan, Integer perScanLimit, 
Integer offset) {
         StatementContext context = plan.getContext();
         TableRef tableRef = plan.getTableRef();
         PTable table = tableRef.getTable();
@@ -218,6 +219,10 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
             if (perScanLimit != null) {
                 ScanUtil.andFilterAtEnd(scan, new PageFilter(perScanLimit));
             }
+            
+            if(offset!=null){
+                ScanUtil.addOffsetAttribute(scan, offset);
+            }
 
             if (optimizeProjection) {
                 optimizeProjection(context, scan, table, statement);
@@ -326,8 +331,9 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
         }
     }
     
-    public BaseResultIterators(QueryPlan plan, Integer perScanLimit, 
ParallelScanGrouper scanGrouper) throws SQLException {
-        super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), 
plan.getOrderBy(), plan.getStatement().getHint(), plan.getLimit());
+    public BaseResultIterators(QueryPlan plan, Integer perScanLimit, Integer 
offset, ParallelScanGrouper scanGrouper) throws SQLException {
+        super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), 
plan.getOrderBy(),
+                plan.getStatement().getHint(), plan.getLimit(), plan 
instanceof ScanPlan ? plan.getOffset() : null);
         this.plan = plan;
         this.scanGrouper = scanGrouper;
         StatementContext context = plan.getContext();
@@ -341,7 +347,7 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
         // Used to tie all the scans together during logging
         scanId = UUID.randomUUID().toString();
         
-        initializeScan(plan, perScanLimit);
+        initializeScan(plan, perScanLimit, offset);
         
         this.scans = getParallelScans();
         List<KeyRange> splitRanges = 
Lists.newArrayListWithExpectedSize(scans.size() * 
ESTIMATED_GUIDEPOSTS_PER_REGION);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/776eea9c/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
index 1b623a4..7f403b0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
@@ -60,18 +60,21 @@ public abstract class ExplainTable {
     protected final OrderBy orderBy;
     protected final HintNode hint;
     protected final Integer limit;
+    protected final Integer offset;
    
     public ExplainTable(StatementContext context, TableRef table) {
-        this(context,table,GroupBy.EMPTY_GROUP_BY, OrderBy.EMPTY_ORDER_BY, 
HintNode.EMPTY_HINT_NODE, null);
+        this(context, table, GroupBy.EMPTY_GROUP_BY, OrderBy.EMPTY_ORDER_BY, 
HintNode.EMPTY_HINT_NODE, null, null);
     }
 
-    public ExplainTable(StatementContext context, TableRef table, GroupBy 
groupBy, OrderBy orderBy, HintNode hintNode, Integer limit) {
+    public ExplainTable(StatementContext context, TableRef table, GroupBy 
groupBy, OrderBy orderBy, HintNode hintNode,
+            Integer limit, Integer offset) {
         this.context = context;
         this.tableRef = table;
         this.groupBy = groupBy;
         this.orderBy = orderBy;
         this.hint = hintNode;
         this.limit = limit;
+        this.offset = offset;
     }
 
     private boolean explainSkipScan(StringBuilder buf) {
@@ -154,8 +157,13 @@ public abstract class ExplainTable {
         if (!orderBy.getOrderByExpressions().isEmpty() && groupBy.isEmpty()) { 
// with GROUP BY, sort happens client-side
             planSteps.add("    SERVER" + (limit == null ? "" : " TOP " + limit 
+ " ROW" + (limit == 1 ? "" : "S"))
                     + " SORTED BY " + 
orderBy.getOrderByExpressions().toString());
-        } else if (pageFilter != null) {
-            planSteps.add("    SERVER " + pageFilter.getPageSize() + " ROW 
LIMIT");                
+        } else {
+            if (offset != null) {
+                planSteps.add("    SERVER OFFSET " + offset);
+            }
+            if (pageFilter != null) {
+                planSteps.add("    SERVER " + pageFilter.getPageSize() + " ROW 
LIMIT");
+            }
         }
         Integer groupByLimit = null;
         byte[] groupByLimitBytes = 
scan.getAttribute(BaseScannerRegionObserver.GROUP_BY_LIMIT);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/776eea9c/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java
index c789093..7cf8d3e 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java
@@ -50,7 +50,7 @@ public class LimitingResultIterator extends 
DelegateResultIterator {
     @Override
     public void explain(List<String> planSteps) {
         super.explain(planSteps);
-        planSteps.add("CLIENT " + limit + " ROW LIMIT");
+            planSteps.add("CLIENT " + limit + " ROW LIMIT");
     }
 
        @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/776eea9c/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortTopNResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortTopNResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortTopNResultIterator.java
index 4c4097f..a9d8046 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortTopNResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortTopNResultIterator.java
@@ -36,13 +36,17 @@ public class MergeSortTopNResultIterator extends 
MergeSortResultIterator {
 
     private final int limit;
     private int count = 0;
+    private int offsetCount = 0;
     private final List<OrderByExpression> orderByColumns;
     private final ImmutableBytesWritable ptr1 = new ImmutableBytesWritable();
     private final ImmutableBytesWritable ptr2 = new ImmutableBytesWritable();
+    private final int offset;
     
-    public MergeSortTopNResultIterator(ResultIterators iterators, Integer 
limit, List<OrderByExpression> orderByColumns) {
+    public MergeSortTopNResultIterator(ResultIterators iterators, Integer 
limit, Integer offset,
+            List<OrderByExpression> orderByColumns) {
         super(iterators);
         this.limit = limit == null ? -1 : limit;
+        this.offset = offset == null ? -1 : offset;
         this.orderByColumns = orderByColumns;
     }
 
@@ -71,6 +75,10 @@ public class MergeSortTopNResultIterator extends 
MergeSortResultIterator {
 
     @Override
     public Tuple peek() throws SQLException {
+        while (offsetCount < offset) {
+            if (super.next() == null) { return null; }
+            offsetCount++;
+        }
         if (limit >= 0 && count >= limit) {
             return null;
         }
@@ -79,9 +87,11 @@ public class MergeSortTopNResultIterator extends 
MergeSortResultIterator {
 
     @Override
     public Tuple next() throws SQLException {
-        if (limit >= 0 && count++ >= limit) {
-            return null;
+        while (offsetCount < offset) {
+            if (super.next() == null) { return null; }
+            offsetCount++;
         }
+        if (limit >= 0 && count++ >= limit) { return null; }
         return super.next();
     }
 
@@ -90,12 +100,15 @@ public class MergeSortTopNResultIterator extends 
MergeSortResultIterator {
     public void explain(List<String> planSteps) {
         resultIterators.explain(planSteps);
         planSteps.add("CLIENT MERGE SORT");
+        if (offset > 0) {
+            planSteps.add("CLIENT OFFSET " + offset);
+        }
     }
 
        @Override
        public String toString() {
                return "MergeSortTopNResultIterator [limit=" + limit + ", 
count="
                                + count + ", orderByColumns=" + orderByColumns 
+ ", ptr1="
-                               + ptr1 + ", ptr2=" + ptr2 + "]";
+                               + ptr1 + ", ptr2=" + ptr2 + ",offset=" + offset 
+ "]";
        }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/776eea9c/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
new file mode 100644
index 0000000..ef8eacf
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ * Iterates through tuples up to a limit
+ *
+ * @since 1.2
+ */
+public class OffsetResultIterator extends DelegateResultIterator {
+    private int rowCount;
+    private int offset;
+
+    public OffsetResultIterator(ResultIterator delegate, Integer offset) {
+        super(delegate);
+        this.offset = offset == null ? -1 : offset;
+    }
+
+    @Override
+    public Tuple next() throws SQLException {
+        while (rowCount < offset) {
+            if (super.next() == null) { return null; }
+            rowCount++;
+        }
+        return super.next();
+    }
+
+    @Override
+    public void explain(List<String> planSteps) {
+        super.explain(planSteps);
+        planSteps.add("CLIENT OFFSET " + offset);
+    }
+
+    @Override
+    public String toString() {
+        return "OffsetResultIterator [rowCount=" + rowCount + ", offset=" + 
offset + "]";
+    }
+
+    public Integer getUnusedOffset() {
+        return (offset - rowCount) > 0 ? (offset - rowCount) : 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/776eea9c/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java
index 6f86605..da2be48 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java
@@ -35,9 +35,9 @@ import org.apache.phoenix.schema.tuple.Tuple;
 public class OrderedAggregatingResultIterator extends OrderedResultIterator 
implements AggregatingResultIterator {
 
     public OrderedAggregatingResultIterator(AggregatingResultIterator delegate,
-                                List<OrderByExpression> orderByExpressions,
-                                int thresholdBytes, Integer limit) throws 
SQLException {
-        super (delegate, orderByExpressions, thresholdBytes, limit);
+            List<OrderByExpression> orderByExpressions, int thresholdBytes, 
Integer limit, Integer offset)
+                    throws SQLException {
+        super(delegate, orderByExpressions, thresholdBytes, limit, offset);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/776eea9c/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
index d999ecb..8dcb2e8 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
@@ -92,6 +92,7 @@ public class OrderedResultIterator implements 
PeekingResultIterator {
 
     private final int thresholdBytes;
     private final Integer limit;
+    private final Integer offset;
     private final ResultIterator delegate;
     private final List<OrderByExpression> orderByExpressions;
     private final long estimatedByteSize;
@@ -103,24 +104,28 @@ public class OrderedResultIterator implements 
PeekingResultIterator {
         return delegate;
     }
     
-    public OrderedResultIterator(ResultIterator delegate,
-                                 List<OrderByExpression> orderByExpressions,
-                                 int thresholdBytes, Integer limit) {
-        this(delegate, orderByExpressions, thresholdBytes, limit, 0);
+    public OrderedResultIterator(ResultIterator delegate, 
List<OrderByExpression> orderByExpressions,
+            int thresholdBytes, Integer limit, Integer offset) {
+        this(delegate, orderByExpressions, thresholdBytes, limit, offset, 0);
     }
 
-    public OrderedResultIterator(ResultIterator delegate,
-            List<OrderByExpression> orderByExpressions, int thresholdBytes) 
throws SQLException {
-        this(delegate, orderByExpressions, thresholdBytes, null);
+    public OrderedResultIterator(ResultIterator delegate, 
List<OrderByExpression> orderByExpressions,
+            int thresholdBytes) throws SQLException {
+        this(delegate, orderByExpressions, thresholdBytes, null, null);
     }
 
     public OrderedResultIterator(ResultIterator delegate, 
List<OrderByExpression> orderByExpressions, 
-            int thresholdBytes, Integer limit, int estimatedRowSize) {
+            int thresholdBytes, Integer limit, Integer offset,int 
estimatedRowSize) {
         checkArgument(!orderByExpressions.isEmpty());
         this.delegate = delegate;
         this.orderByExpressions = orderByExpressions;
         this.thresholdBytes = thresholdBytes;
-        this.limit = limit;
+        this.offset = offset == null ? 0 : offset;
+        if (limit != null) {
+            this.limit = limit + this.offset;
+        } else {
+            this.limit = null;
+        }
         long estimatedEntrySize =
             // ResultEntry
             SizedUtil.OBJECT_SIZE + 
@@ -130,9 +135,9 @@ public class OrderedResultIterator implements 
PeekingResultIterator {
             SizedUtil.OBJECT_SIZE + estimatedRowSize;
 
         // Make sure we don't overflow Long, though this is really unlikely to 
happen.
-        assert(limit == null || Long.MAX_VALUE / estimatedEntrySize >= limit);
+        assert(limit == null || Long.MAX_VALUE / estimatedEntrySize >= limit + 
this.offset);
 
-        this.estimatedByteSize = limit == null ? 0 : limit * 
estimatedEntrySize;
+        this.estimatedByteSize = limit == null ? 0 : (limit + this.offset) * 
estimatedEntrySize;
     }
 
     public Integer getLimit() {
@@ -202,13 +207,20 @@ public class OrderedResultIterator implements 
PeekingResultIterator {
         List<Expression> expressions = 
Lists.newArrayList(Collections2.transform(orderByExpressions, TO_EXPRESSION));
         final Comparator<ResultEntry> comparator = 
buildComparator(orderByExpressions);
         try{
-            final MappedByteBufferSortedQueue queueEntries = new 
MappedByteBufferSortedQueue(comparator, limit, thresholdBytes);
+            final MappedByteBufferSortedQueue queueEntries = new 
MappedByteBufferSortedQueue(comparator, limit,
+                    thresholdBytes);
             resultIterator = new PeekingResultIterator() {
                 int count = 0;
+
                 @Override
                 public Tuple next() throws SQLException {
                     ResultEntry entry = queueEntries.poll();
-                    if (entry == null || (limit != null && ++count > limit)) {
+                    while (entry != null && offset != null && count < offset) {
+                        count++;
+                        if (entry.getResult() == null) { return null; }
+                        entry = queueEntries.poll();
+                    }
+                    if (entry == null || (limit != null && count++ > limit)) {
                         resultIterator.close();
                         resultIterator = PeekingResultIterator.EMPTY_ITERATOR;
                         return null;
@@ -218,13 +230,15 @@ public class OrderedResultIterator implements 
PeekingResultIterator {
                 
                 @Override
                 public Tuple peek() throws SQLException {
-                    if (limit != null && count > limit) {
-                        return null;
-                    }
-                    ResultEntry entry =  queueEntries.peek();
-                    if (entry == null) {
-                        return null;
+                    ResultEntry entry = queueEntries.peek();
+                    while (entry != null && offset != null && count < offset) {
+                        entry = queueEntries.poll();
+                        count++;
+                        if (entry == null) { return null; }
                     }
+                    if (limit != null && count > limit) { return null; }
+                    entry = queueEntries.peek();
+                    if (entry == null) { return null; }
                     return entry.getResult();
                 }
 
@@ -273,13 +287,15 @@ public class OrderedResultIterator implements 
PeekingResultIterator {
     @Override
     public void explain(List<String> planSteps) {
         delegate.explain(planSteps);
-        planSteps.add("CLIENT" + (limit == null ? "" : " TOP " + limit + " 
ROW"  + (limit == 1 ? "" : "S"))  + " SORTED BY " + 
orderByExpressions.toString());
+        planSteps.add("CLIENT" + (offset != null ? "" : " OFFSET " + offset)
+                + (limit == null ? "" : " TOP " + limit + " ROW" + (limit == 1 
? "" : "S")) + " SORTED BY "
+                + orderByExpressions.toString());
     }
 
     @Override
     public String toString() {
         return "OrderedResultIterator [thresholdBytes=" + thresholdBytes
-                + ", limit=" + limit + ", delegate=" + delegate
+                + ", limit=" + limit + ", offset=" + offset + ", delegate=" + 
delegate
                 + ", orderByExpressions=" + orderByExpressions
                 + ", estimatedByteSize=" + estimatedByteSize
                 + ", resultIterator=" + resultIterator + ", byteSize="

http://git-wip-us.apache.org/repos/asf/phoenix/blob/776eea9c/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index 4339b05..1351735 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -30,8 +30,8 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.job.JobManager.JobCallable;
-import org.apache.phoenix.monitoring.MetricType;
 import org.apache.phoenix.monitoring.CombinableMetric;
+import org.apache.phoenix.monitoring.MetricType;
 import org.apache.phoenix.monitoring.ReadMetricQueue;
 import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
 import org.apache.phoenix.trace.util.Tracing;
@@ -56,7 +56,7 @@ public class ParallelIterators extends BaseResultIterators {
     
     public ParallelIterators(QueryPlan plan, Integer perScanLimit, 
ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper)
             throws SQLException {
-        super(plan, perScanLimit, scanGrouper);
+        super(plan, perScanLimit, null, scanGrouper);
         this.iteratorFactory = iteratorFactory;
     }   
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/776eea9c/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index 60b9f44..f3b9e7d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -27,12 +27,14 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.job.JobManager.JobCallable;
 import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
 import org.apache.phoenix.parse.HintNode;
+import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.util.ScanUtil;
 
@@ -52,10 +54,13 @@ public class SerialIterators extends BaseResultIterators {
        private static final String NAME = "SERIAL";
     private final ParallelIteratorFactory iteratorFactory;
     
-    public SerialIterators(QueryPlan plan, Integer perScanLimit, 
ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper)
+    public SerialIterators(QueryPlan plan, Integer perScanLimit, Integer 
offset,
+            ParallelIteratorFactory iteratorFactory, ParallelScanGrouper 
scanGrouper)
             throws SQLException {
-        super(plan, perScanLimit, scanGrouper);
-        Preconditions.checkArgument(perScanLimit != null || 
plan.getStatement().getHint().hasHint(HintNode.Hint.SERIAL)); // must be a 
limit specified or a SERIAL hint
+        super(plan, perScanLimit, offset, scanGrouper);
+        // must be a offset or a limit specified or a SERIAL hint
+        Preconditions.checkArgument(
+                offset != null || perScanLimit != null || 
plan.getStatement().getHint().hasHint(HintNode.Hint.SERIAL));
         this.iteratorFactory = iteratorFactory;
     }
 
@@ -78,14 +83,18 @@ public class SerialIterators extends BaseResultIterators {
             final TaskExecutionMetricsHolder taskMetrics = new 
TaskExecutionMetricsHolder(context.getReadMetricsQueue(), tableName);
             final PhoenixConnection conn = context.getConnection();
             final long renewLeaseThreshold = 
conn.getQueryServices().getRenewLeaseThresholdMilliSeconds();
+            lastScan.setAttribute(QueryConstants.LAST_SCAN, 
Bytes.toBytes(Boolean.TRUE));
             Future<PeekingResultIterator> future = 
executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() {
                 @Override
                 public PeekingResultIterator call() throws Exception {
+                    PeekingResultIterator previousIterator = null;
                        List<PeekingResultIterator> concatIterators = 
Lists.newArrayListWithExpectedSize(scans.size());
                        for (final Scan scan : scans) {
-                           TableResultIterator scanner = new 
TableResultIterator(mutationState, tableRef, scan, 
context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), 
renewLeaseThreshold);
+                           TableResultIterator scanner = new 
TableResultIterator(mutationState, tableRef, scan, 
context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), 
renewLeaseThreshold, previousIterator);
                            conn.addIterator(scanner);
-                           
concatIterators.add(iteratorFactory.newIterator(context, scanner, scan, 
tableName));
+                           PeekingResultIterator iterator = 
iteratorFactory.newIterator(context, scanner, scan, tableName);
+                           concatIterators.add(iterator);
+                           previousIterator = iterator;
                        }
                        PeekingResultIterator concatIterator = 
ConcatResultIterator.newIterator(concatIterators);
                     allIterators.add(concatIterator);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/776eea9c/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index d97a535..a86f899 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -32,12 +32,14 @@ import javax.annotation.concurrent.GuardedBy;
 import org.apache.hadoop.hbase.client.AbstractClientScanner;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.monitoring.CombinableMetric;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.Closeables;
+import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ServerUtil;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -66,6 +68,7 @@ public class TableResultIterator implements ResultIterator {
 
     @GuardedBy("this")
     private long renewLeaseTime = 0;
+    private PeekingResultIterator previousIterator;
 
     @VisibleForTesting // Exposed for testing. DON'T USE ANYWHERE ELSE!
     TableResultIterator() {
@@ -79,14 +82,20 @@ public class TableResultIterator implements ResultIterator {
         RENEWED, CLOSED, UNINITIALIZED, THRESHOLD_NOT_REACHED, NOT_RENEWED
     };
 
-
-    public TableResultIterator(MutationState mutationState, TableRef tableRef, 
Scan scan, CombinableMetric scanMetrics, long renewLeaseThreshold) throws 
SQLException {
+    public TableResultIterator(MutationState mutationState, TableRef tableRef, 
Scan scan, CombinableMetric scanMetrics,
+                       long renewLeaseThreshold) throws SQLException {
+       this(mutationState,tableRef,scan,scanMetrics,renewLeaseThreshold,null);
+    }
+    
+    public TableResultIterator(MutationState mutationState, TableRef tableRef, 
Scan scan, CombinableMetric scanMetrics,
+            long renewLeaseThreshold, PeekingResultIterator previousIterator) 
throws SQLException {
         this.scan = scan;
         this.scanMetrics = scanMetrics;
         PTable table = tableRef.getTable();
         htable = mutationState.getHTable(table);
         this.scanIterator = UNINITIALIZED_SCANNER;
         this.renewLeaseThreshold = renewLeaseThreshold;
+        this.previousIterator = previousIterator;
     }
 
     @Override
@@ -118,6 +127,15 @@ public class TableResultIterator implements ResultIterator 
{
         ResultIterator delegate = this.scanIterator;
         if (delegate == UNINITIALIZED_SCANNER) {
             try {
+                if (previousIterator != null && 
scan.getAttribute(BaseScannerRegionObserver.SCAN_OFFSET) != null) {
+                    byte[] unusedOffset = 
QueryUtil.getUnusedOffset(previousIterator.peek());
+                    if (unusedOffset != null) {
+                        
scan.setAttribute(BaseScannerRegionObserver.SCAN_OFFSET, unusedOffset);
+                        previousIterator.next();
+                    } else {
+                        
scan.setAttribute(BaseScannerRegionObserver.SCAN_OFFSET, null);
+                    }
+                }
                 this.scanIterator =
                         new ScanningResultIterator(htable.getScanner(scan), 
scanMetrics);
             } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/776eea9c/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 7e8969b..880e758 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -110,6 +110,7 @@ import org.apache.phoenix.parse.ListJarsStatement;
 import org.apache.phoenix.parse.LiteralParseNode;
 import org.apache.phoenix.parse.NamedNode;
 import org.apache.phoenix.parse.NamedTableNode;
+import org.apache.phoenix.parse.OffsetNode;
 import org.apache.phoenix.parse.OrderByNode;
 import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.parse.ParseNode;
@@ -372,14 +373,14 @@ public class PhoenixStatement implements Statement, 
SQLCloseable {
     
     private static class ExecutableSelectStatement extends SelectStatement 
implements CompilableStatement {
         private ExecutableSelectStatement(TableNode from, HintNode hint, 
boolean isDistinct, List<AliasedNode> select, ParseNode where,
-                List<ParseNode> groupBy, ParseNode having, List<OrderByNode> 
orderBy, LimitNode limit, int bindCount, boolean isAggregate, boolean 
hasSequence, Map<String, UDFParseNode> udfParseNodes) {
-            this(from, hint, isDistinct, select, where, groupBy, having, 
orderBy, limit, bindCount, isAggregate, hasSequence, 
Collections.<SelectStatement>emptyList(), udfParseNodes);
+                List<ParseNode> groupBy, ParseNode having, List<OrderByNode> 
orderBy, LimitNode limit,OffsetNode offset, int bindCount, boolean isAggregate, 
boolean hasSequence, Map<String, UDFParseNode> udfParseNodes) {
+            this(from, hint, isDistinct, select, where, groupBy, having, 
orderBy, limit,offset, bindCount, isAggregate, hasSequence, 
Collections.<SelectStatement>emptyList(), udfParseNodes);
         }
 
         private ExecutableSelectStatement(TableNode from, HintNode hint, 
boolean isDistinct, List<AliasedNode> select, ParseNode where,
-                List<ParseNode> groupBy, ParseNode having, List<OrderByNode> 
orderBy, LimitNode limit, int bindCount, boolean isAggregate,
+                List<ParseNode> groupBy, ParseNode having, List<OrderByNode> 
orderBy, LimitNode limit,OffsetNode offset, int bindCount, boolean isAggregate,
                 boolean hasSequence, List<SelectStatement> selects, 
Map<String, UDFParseNode> udfParseNodes) {
-            super(from, hint, isDistinct, select, where, groupBy, having, 
orderBy, limit, bindCount, isAggregate, hasSequence, selects, udfParseNodes);
+            super(from, hint, isDistinct, select, where, groupBy, having, 
orderBy, limit, offset, bindCount, isAggregate, hasSequence, selects, 
udfParseNodes);
         }
         
         @SuppressWarnings("unchecked")
@@ -511,6 +512,11 @@ public class PhoenixStatement implements Statement, 
SQLCloseable {
                 }
 
                 @Override
+                public Integer getOffset() {
+                    return null;
+                }
+
+                @Override
                 public OrderBy getOrderBy() {
                     return OrderBy.EMPTY_ORDER_BY;
                 }
@@ -1053,10 +1059,10 @@ public class PhoenixStatement implements Statement, 
SQLCloseable {
     protected static class ExecutableNodeFactory extends ParseNodeFactory {
         @Override
         public ExecutableSelectStatement select(TableNode from, HintNode hint, 
boolean isDistinct, List<AliasedNode> select, ParseNode where,
-                List<ParseNode> groupBy, ParseNode having, List<OrderByNode> 
orderBy, LimitNode limit, int bindCount, boolean isAggregate,
+                List<ParseNode> groupBy, ParseNode having, List<OrderByNode> 
orderBy, LimitNode limit, OffsetNode offset, int bindCount, boolean isAggregate,
                 boolean hasSequence, List<SelectStatement> selects, 
Map<String, UDFParseNode> udfParseNodes) {
             return new ExecutableSelectStatement(from, hint, isDistinct, 
select, where, groupBy == null ? Collections.<ParseNode>emptyList() : groupBy,
-                    having, orderBy == null ? 
Collections.<OrderByNode>emptyList() : orderBy, limit, bindCount, isAggregate, 
hasSequence, selects == null ? Collections.<SelectStatement>emptyList() : 
selects, udfParseNodes);
+                    having, orderBy == null ? 
Collections.<OrderByNode>emptyList() : orderBy, limit, offset, bindCount, 
isAggregate, hasSequence, selects == null ? 
Collections.<SelectStatement>emptyList() : selects, udfParseNodes);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/776eea9c/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java 
b/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java
index 31718e5..3161eb4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java
@@ -222,7 +222,7 @@ public class HashJoinInfo {
                 forceProjection = input.readBoolean();
             } catch (EOFException ignore) {
             }
-            return new HashJoinInfo(joinedSchema, joinIds, joinExpressions, 
joinTypes, earlyEvaluation, schemas, fieldPositions, postJoinFilterExpression, 
limit >= 0 ? limit : null, forceProjection);
+            return new HashJoinInfo(joinedSchema, joinIds, joinExpressions, 
joinTypes, earlyEvaluation, schemas, fieldPositions, postJoinFilterExpression, 
limit >= 0 ? limit : null,  forceProjection);
         } catch (IOException e) {
             throw new RuntimeException(e);
         } finally {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/776eea9c/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java 
b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index 40591c2..6eb6cb0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -285,7 +285,7 @@ public class QueryOptimizer {
                             aliasedNodes.add(FACTORY.aliasedNode(null, 
indexColNode));
                             nodes.add(new ColumnParseNode(null, '"' + 
column.getName().getString() + '"'));
                         }
-                        SelectStatement innerSelect = 
FACTORY.select(indexSelect.getFrom(), indexSelect.getHint(), false, 
aliasedNodes, where, null, null, null, null, indexSelect.getBindCount(), false, 
indexSelect.hasSequence(), Collections.<SelectStatement>emptyList(), 
indexSelect.getUdfParseNodes());
+                        SelectStatement innerSelect = 
FACTORY.select(indexSelect.getFrom(), indexSelect.getHint(), false, 
aliasedNodes, where, null, null, null, null, null, indexSelect.getBindCount(), 
false, indexSelect.hasSequence(), Collections.<SelectStatement>emptyList(), 
indexSelect.getUdfParseNodes());
                         ParseNode outerWhere = FACTORY.in(nodes.size() == 1 ? 
nodes.get(0) : FACTORY.rowValueConstructor(nodes), 
FACTORY.subquery(innerSelect, false), false, true);
                         ParseNode extractedCondition = 
whereRewriter.getExtractedCondition();
                         if (extractedCondition != null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/776eea9c/phoenix-core/src/main/java/org/apache/phoenix/parse/DeleteStatement.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/DeleteStatement.java 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/DeleteStatement.java
index 276e6aa..fb2d327 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/DeleteStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/DeleteStatement.java
@@ -71,4 +71,10 @@ public class DeleteStatement extends DMLStatement implements 
FilterableStatement
     public Operation getOperation() {
         return Operation.DELETE;
     }
+
+    @Override
+    public OffsetNode getOffset() {
+        return null;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/776eea9c/phoenix-core/src/main/java/org/apache/phoenix/parse/FilterableStatement.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/FilterableStatement.java 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/FilterableStatement.java
index eda13c8..ad54d98 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/FilterableStatement.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/FilterableStatement.java
@@ -26,4 +26,5 @@ public interface FilterableStatement extends 
BindableStatement {
     public boolean isAggregate();
     public List<OrderByNode> getOrderBy();
     public LimitNode getLimit();
+    public OffsetNode getOffset();
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/776eea9c/phoenix-core/src/main/java/org/apache/phoenix/parse/OffsetNode.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/OffsetNode.java 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/OffsetNode.java
new file mode 100644
index 0000000..0387f5c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/OffsetNode.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+
+public class OffsetNode {
+    private final BindParseNode bindNode;
+    private final LiteralParseNode offsetNode;
+    
+    OffsetNode(BindParseNode bindNode) {
+        this.bindNode = bindNode;
+        offsetNode = null;
+    }
+    
+    OffsetNode(LiteralParseNode limitNode) {
+        this.offsetNode = limitNode;
+        this.bindNode = null;
+    }
+    
+    public ParseNode getOffsetParseNode() {
+        return bindNode == null ? offsetNode : bindNode;
+    }
+    
+    @Override
+    public String toString() {
+        return bindNode == null ? offsetNode.toString() : bindNode.toString();
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((bindNode == null) ? 0 : 
bindNode.hashCode());
+        result = prime * result + ((offsetNode == null) ? 0 : 
offsetNode.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) return true;
+        if (obj == null) return false;
+        if (getClass() != obj.getClass()) return false;
+        OffsetNode other = (OffsetNode)obj;
+        if (bindNode == null) {
+            if (other.bindNode != null) return false;
+        } else if (!bindNode.equals(other.bindNode)) return false;
+        if (offsetNode == null) {
+            if (other.offsetNode != null) return false;
+        } else if (!offsetNode.equals(other.offsetNode)) return false;
+        return true;
+    }
+}

Reply via email to