Repository: phoenix
Updated Branches:
  refs/heads/master 91800b703 -> 776eea9ce


http://git-wip-us.apache.org/repos/asf/phoenix/blob/776eea9c/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index b485666..4c6c1e6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -681,11 +681,11 @@ public class ParseNodeFactory {
     }
 
     public SelectStatement select(TableNode from, HintNode hint, boolean 
isDistinct, List<AliasedNode> select, ParseNode where,
-            List<ParseNode> groupBy, ParseNode having, List<OrderByNode> 
orderBy, LimitNode limit, int bindCount, boolean isAggregate, 
+            List<ParseNode> groupBy, ParseNode having, List<OrderByNode> 
orderBy, LimitNode limit, OffsetNode offset, int bindCount, boolean 
isAggregate, 
             boolean hasSequence, List<SelectStatement> selects, Map<String, 
UDFParseNode> udfParseNodes) {
 
         return new SelectStatement(from, hint, isDistinct, select, where, 
groupBy == null ? Collections.<ParseNode>emptyList() : groupBy, having,
-                orderBy == null ? Collections.<OrderByNode>emptyList() : 
orderBy, limit, bindCount, isAggregate, hasSequence, selects == null ? 
Collections.<SelectStatement>emptyList() : selects, udfParseNodes);
+                orderBy == null ? Collections.<OrderByNode>emptyList() : 
orderBy, limit, offset, bindCount, isAggregate, hasSequence, selects == null ? 
Collections.<SelectStatement>emptyList() : selects, udfParseNodes);
     } 
     
     public UpsertStatement upsert(NamedTableNode table, HintNode hint, 
List<ColumnName> columns, List<ParseNode> values, SelectStatement select, int 
bindCount, Map<String, UDFParseNode> udfParseNodes) {
@@ -698,89 +698,90 @@ public class ParseNodeFactory {
 
     public SelectStatement select(SelectStatement statement, ParseNode where) {
         return select(statement.getFrom(), statement.getHint(), 
statement.isDistinct(), statement.getSelect(), where, statement.getGroupBy(), 
statement.getHaving(),
-                statement.getOrderBy(), statement.getLimit(), 
statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), 
statement.getSelects(), statement.getUdfParseNodes());
+                statement.getOrderBy(), statement.getLimit(), 
statement.getOffset(), statement.getBindCount(), statement.isAggregate(), 
statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
     }
 
     public SelectStatement select(SelectStatement statement, ParseNode where, 
ParseNode having) {
         return select(statement.getFrom(), statement.getHint(), 
statement.isDistinct(), statement.getSelect(), where, statement.getGroupBy(), 
having,
-                statement.getOrderBy(), statement.getLimit(), 
statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), 
statement.getSelects(), statement.getUdfParseNodes());
+                statement.getOrderBy(), statement.getLimit(), 
statement.getOffset(), statement.getBindCount(), statement.isAggregate(), 
statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
     }
     
     public SelectStatement select(SelectStatement statement, List<AliasedNode> 
select, ParseNode where, List<ParseNode> groupBy, ParseNode having, 
List<OrderByNode> orderBy) {
         return select(statement.getFrom(), statement.getHint(), 
statement.isDistinct(), 
-                select, where, groupBy, having, orderBy, statement.getLimit(), 
statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), 
statement.getSelects(), statement.getUdfParseNodes());
+                select, where, groupBy, having, orderBy, statement.getLimit(), 
statement.getOffset(), statement.getBindCount(), statement.isAggregate(), 
statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
     }
     
     public SelectStatement select(SelectStatement statement, TableNode table) {
         return select(table, statement.getHint(), statement.isDistinct(), 
statement.getSelect(), statement.getWhere(), statement.getGroupBy(),
-                statement.getHaving(), statement.getOrderBy(), 
statement.getLimit(), statement.getBindCount(), statement.isAggregate(),
+                statement.getHaving(), statement.getOrderBy(), 
statement.getLimit(), statement.getOffset(), statement.getBindCount(), 
statement.isAggregate(),
                 statement.hasSequence(), statement.getSelects(), 
statement.getUdfParseNodes());
     }
 
     public SelectStatement select(SelectStatement statement, TableNode table, 
ParseNode where) {
         return select(table, statement.getHint(), statement.isDistinct(), 
statement.getSelect(), where, statement.getGroupBy(),
-                statement.getHaving(), statement.getOrderBy(), 
statement.getLimit(), statement.getBindCount(), statement.isAggregate(),
+                statement.getHaving(), statement.getOrderBy(), 
statement.getLimit(), statement.getOffset(), statement.getBindCount(), 
statement.isAggregate(),
                 statement.hasSequence(), statement.getSelects(), 
statement.getUdfParseNodes());
     }
 
     public SelectStatement select(SelectStatement statement, boolean 
isDistinct, List<AliasedNode> select) {
         return select(statement.getFrom(), statement.getHint(), isDistinct, 
select, statement.getWhere(), statement.getGroupBy(),
-                statement.getHaving(), statement.getOrderBy(), 
statement.getLimit(), statement.getBindCount(), statement.isAggregate(),
+                statement.getHaving(), statement.getOrderBy(), 
statement.getLimit(), statement.getOffset(), statement.getBindCount(), 
statement.isAggregate(),
                 statement.hasSequence(), statement.getSelects(), 
statement.getUdfParseNodes());
     }
 
     public SelectStatement select(SelectStatement statement, boolean 
isDistinct, List<AliasedNode> select, ParseNode where) {
         return select(statement.getFrom(), statement.getHint(), isDistinct, 
select, where, statement.getGroupBy(),
-                statement.getHaving(), statement.getOrderBy(), 
statement.getLimit(), statement.getBindCount(), statement.isAggregate(),
+                statement.getHaving(), statement.getOrderBy(), 
statement.getLimit(), statement.getOffset(), statement.getBindCount(), 
statement.isAggregate(),
                 statement.hasSequence(), statement.getSelects(), 
statement.getUdfParseNodes());
     }
 
     public SelectStatement select(SelectStatement statement, boolean 
isDistinct, List<AliasedNode> select, ParseNode where, List<ParseNode> groupBy, 
boolean isAggregate) {
         return select(statement.getFrom(), statement.getHint(), isDistinct, 
select, where, groupBy,
-                statement.getHaving(), statement.getOrderBy(), 
statement.getLimit(), statement.getBindCount(), isAggregate,
+                statement.getHaving(), statement.getOrderBy(), 
statement.getLimit(), statement.getOffset(), statement.getBindCount(), 
isAggregate,
                 statement.hasSequence(), statement.getSelects(), 
statement.getUdfParseNodes());
     }
 
     public SelectStatement select(SelectStatement statement, List<OrderByNode> 
orderBy) {
         return select(statement.getFrom(), statement.getHint(), 
statement.isDistinct(), statement.getSelect(),
                 statement.getWhere(), statement.getGroupBy(), 
statement.getHaving(), orderBy, statement.getLimit(),
-                statement.getBindCount(), statement.isAggregate(), 
statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
+                statement.getOffset(), statement.getBindCount(), 
statement.isAggregate(), statement.hasSequence(), statement.getSelects(), 
statement.getUdfParseNodes());
     }
 
     public SelectStatement select(SelectStatement statement, HintNode hint) {
         return hint == null || hint.isEmpty() ? statement : 
select(statement.getFrom(), hint, statement.isDistinct(), statement.getSelect(),
-                statement.getWhere(), statement.getGroupBy(), 
statement.getHaving(), statement.getOrderBy(), statement.getLimit(),
+                statement.getWhere(), statement.getGroupBy(), 
statement.getHaving(), statement.getOrderBy(), statement.getLimit(), 
statement.getOffset(), 
                 statement.getBindCount(), statement.isAggregate(), 
statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
     }
 
     public SelectStatement select(SelectStatement statement, HintNode hint, 
ParseNode where) {
         return select(statement.getFrom(), hint, statement.isDistinct(), 
statement.getSelect(), where, statement.getGroupBy(),
-                statement.getHaving(), statement.getOrderBy(), 
statement.getLimit(), statement.getBindCount(), statement.isAggregate(),
+                statement.getHaving(), statement.getOrderBy(), 
statement.getLimit(), statement.getOffset(), statement.getBindCount(), 
statement.isAggregate(),
                 statement.hasSequence(), statement.getSelects(), 
statement.getUdfParseNodes());
     }
 
-    public SelectStatement select(SelectStatement statement, List<OrderByNode> 
orderBy, LimitNode limit, int bindCount, boolean isAggregate) {
+    public SelectStatement select(SelectStatement statement, List<OrderByNode> 
orderBy, LimitNode limit, OffsetNode offset, int bindCount, boolean 
isAggregate) {
         return select(statement.getFrom(), statement.getHint(), 
statement.isDistinct(), statement.getSelect(),
-            statement.getWhere(), statement.getGroupBy(), 
statement.getHaving(), orderBy, limit,
+            statement.getWhere(), statement.getGroupBy(), 
statement.getHaving(), orderBy, limit, offset,
             bindCount, isAggregate || statement.isAggregate(), 
statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
 
     }
 
     public SelectStatement select(SelectStatement statement, LimitNode limit) {
         return select(statement.getFrom(), statement.getHint(), 
statement.isDistinct(), statement.getSelect(),
-            statement.getWhere(), statement.getGroupBy(), 
statement.getHaving(), statement.getOrderBy(), limit,
-            statement.getBindCount(), statement.isAggregate(), 
statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
+                statement.getWhere(), statement.getGroupBy(), 
statement.getHaving(), statement.getOrderBy(), limit,
+                statement.getOffset(), statement.getBindCount(), 
statement.isAggregate(), statement.hasSequence(),
+                statement.getSelects(), statement.getUdfParseNodes());
     }
 
-    public SelectStatement select(SelectStatement statement, List<OrderByNode> 
orderBy, LimitNode limit) {
+    public SelectStatement select(SelectStatement statement, List<OrderByNode> 
orderBy, LimitNode limit, OffsetNode offset) {
         return select(statement.getFrom(), statement.getHint(), 
statement.isDistinct(), statement.getSelect(),
-            statement.getWhere(), statement.getGroupBy(), 
statement.getHaving(), orderBy, limit,
+            statement.getWhere(), statement.getGroupBy(), 
statement.getHaving(), orderBy, limit,offset,
             statement.getBindCount(), statement.isAggregate(), 
statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
     }
 
-    public SelectStatement select(List<SelectStatement> statements, 
List<OrderByNode> orderBy, LimitNode limit, int bindCount, boolean isAggregate) 
{
-        if (statements.size() == 1)
-            return select(statements.get(0), orderBy, limit, bindCount, 
isAggregate);        
+    public SelectStatement select(List<SelectStatement> statements, 
List<OrderByNode> orderBy, LimitNode limit,
+            OffsetNode offset, int bindCount, boolean isAggregate) {
+        if (statements.size() == 1) return select(statements.get(0), orderBy, 
limit, offset, bindCount, isAggregate);        
 
         // Get a list of adjusted aliases from a non-wildcard sub-select if 
any. 
         // We do not check the number of select nodes among all sub-selects, 
as 
@@ -813,7 +814,7 @@ public class ParseNodeFactory {
         }
         
         return select(null, HintNode.EMPTY_HINT_NODE, false, aliasedNodes, 
-                null, null, null, orderBy, limit, bindCount, false, false, 
statements, udfParseNodes);
+                null, null, null, orderBy, limit,offset, bindCount, false, 
false, statements, udfParseNodes);
     }
 
     public SubqueryParseNode subquery(SelectStatement select, boolean 
expectSingleRow) {
@@ -827,4 +828,12 @@ public class ParseNodeFactory {
     public LimitNode limit(LiteralParseNode l) {
         return new LimitNode(l);
     }
+
+    public OffsetNode offset(BindParseNode b) {
+        return new OffsetNode(b);
+    }
+
+    public OffsetNode offset(LiteralParseNode l) {
+        return new OffsetNode(l);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/776eea9c/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java
index e48967b..12ef2e1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java
@@ -150,7 +150,7 @@ public class ParseNodeRewriter extends 
TraverseAllParseNodeVisitor<ParseNode> {
         }
         return NODE_FACTORY.select(normFrom, statement.getHint(), 
statement.isDistinct(),
                 normSelectNodes, normWhere, normGroupByNodes, normHaving, 
normOrderByNodes,
-                statement.getLimit(), statement.getBindCount(), 
statement.isAggregate(), statement.hasSequence(),
+                statement.getLimit(), statement.getOffset(), 
statement.getBindCount(), statement.isAggregate(), statement.hasSequence(),
                 statement.getSelects(), statement.getUdfParseNodes());
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/776eea9c/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java
index 93c7364..a7c1a0b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java
@@ -44,7 +44,7 @@ public class SelectStatement implements FilterableStatement {
                     Collections.<AliasedNode>singletonList(new 
AliasedNode(null, LiteralParseNode.ONE)),
                     null, Collections.<ParseNode>emptyList(),
                     null, Collections.<OrderByNode>emptyList(),
-                    null, 0, false, false, 
Collections.<SelectStatement>emptyList(), new HashMap<String, UDFParseNode>(1));
+                    null, null, 0, false, false, 
Collections.<SelectStatement>emptyList(), new HashMap<String, UDFParseNode>(1));
     public static final SelectStatement COUNT_ONE =
             new SelectStatement(
                     null, null, false,
@@ -56,14 +56,14 @@ public class SelectStatement implements FilterableStatement 
{
                                 new 
BuiltInFunctionInfo(CountAggregateFunction.class, 
CountAggregateFunction.class.getAnnotation(BuiltInFunction.class))))),
                     null, Collections.<ParseNode>emptyList(), 
                     null, Collections.<OrderByNode>emptyList(), 
-                    null, 0, true, false, 
Collections.<SelectStatement>emptyList(), new HashMap<String, UDFParseNode>(1));
+                    null,null, 0, true, false, 
Collections.<SelectStatement>emptyList(), new HashMap<String, UDFParseNode>(1));
     public static SelectStatement create(SelectStatement select, HintNode 
hint) {
         if (select.getHint() == hint || hint.isEmpty()) {
             return select;
         }
         return new SelectStatement(select.getFrom(), hint, 
select.isDistinct(), 
                 select.getSelect(), select.getWhere(), select.getGroupBy(), 
select.getHaving(), 
-                select.getOrderBy(), select.getLimit(), select.getBindCount(), 
select.isAggregate(), select.hasSequence(), select.getSelects(), 
select.getUdfParseNodes());
+                select.getOrderBy(), select.getLimit(), select.getOffset(), 
select.getBindCount(), select.isAggregate(), select.hasSequence(), 
select.getSelects(), select.getUdfParseNodes());
     }
     
     public SelectStatement combine(ParseNode where) {
@@ -75,21 +75,22 @@ public class SelectStatement implements FilterableStatement 
{
         }
         return new SelectStatement(this.getFrom(), this.getHint(), 
this.isDistinct(), 
                 this.getSelect(), where, this.getGroupBy(), this.getHaving(), 
-                this.getOrderBy(), this.getLimit(), this.getBindCount(), 
this.isAggregate(), this.hasSequence(), this.selects, this.udfParseNodes);
+                this.getOrderBy(), this.getLimit(), this.getOffset(), 
this.getBindCount(), this.isAggregate(), this.hasSequence(), this.selects, 
this.udfParseNodes);
     }
     
     public static SelectStatement create(SelectStatement select, 
List<AliasedNode> selects) {
         return new SelectStatement(select.getFrom(), select.getHint(), 
select.isDistinct(), 
                 selects, select.getWhere(), select.getGroupBy(), 
select.getHaving(), 
-                select.getOrderBy(), select.getLimit(), select.getBindCount(), 
select.isAggregate(), select.hasSequence(), select.getSelects(), 
select.getUdfParseNodes());
+                select.getOrderBy(), select.getLimit(), select.getOffset(), 
select.getBindCount(), select.isAggregate(), select.hasSequence(), 
select.getSelects(), select.getUdfParseNodes());
     }
     
     // Copy constructor for sub select statements in a union
-    public static SelectStatement create(SelectStatement select, 
-            List<OrderByNode> orderBy, LimitNode limit, boolean isAggregate) {
-        return new SelectStatement(select.getFrom(), select.getHint(), 
select.isDistinct(), 
-                select.getSelect(), select.getWhere(), select.getGroupBy(), 
select.getHaving(), 
-                orderBy, limit, select.getBindCount(), isAggregate, 
select.hasSequence(), select.getSelects(), select.getUdfParseNodes());
+    public static SelectStatement create(SelectStatement select, 
List<OrderByNode> orderBy, LimitNode limit,
+            OffsetNode offset, boolean isAggregate) {
+        return new SelectStatement(select.getFrom(), select.getHint(), 
select.isDistinct(), select.getSelect(),
+                select.getWhere(), select.getGroupBy(), select.getHaving(), 
orderBy, limit, offset,
+                select.getBindCount(), isAggregate, select.hasSequence(), 
select.getSelects(),
+                select.getUdfParseNodes());
     }
 
     private final TableNode fromTable;
@@ -107,6 +108,7 @@ public class SelectStatement implements FilterableStatement 
{
     private final boolean hasWildcard;
     private final List<SelectStatement> selects = new 
ArrayList<SelectStatement>();
     private final Map<String, UDFParseNode> udfParseNodes;
+    private final OffsetNode offset;
     
     @Override
     public final String toString() {
@@ -155,6 +157,9 @@ public class SelectStatement implements FilterableStatement 
{
         if (limit != null) {
             buf.append(" LIMIT " + limit.toString());
         }
+        if (offset != null) {
+            buf.append(" OFFSET " + offset.toString());
+        }
     }    
 
     
@@ -221,7 +226,8 @@ public class SelectStatement implements FilterableStatement 
{
     
     protected SelectStatement(TableNode from, HintNode hint, boolean 
isDistinct, List<AliasedNode> select,
             ParseNode where, List<ParseNode> groupBy, ParseNode having, 
List<OrderByNode> orderBy, LimitNode limit,
-            int bindCount, boolean isAggregate, boolean hasSequence, 
List<SelectStatement> selects, Map<String, UDFParseNode> udfParseNodes) {
+            OffsetNode offset, int bindCount, boolean isAggregate, boolean 
hasSequence, List<SelectStatement> selects,
+            Map<String, UDFParseNode> udfParseNodes) {
         this.fromTable = from;
         this.hint = hint == null ? HintNode.EMPTY_HINT_NODE : hint;
         this.isDistinct = isDistinct;
@@ -231,6 +237,7 @@ public class SelectStatement implements FilterableStatement 
{
         this.having = having;
         this.orderBy = Collections.unmodifiableList(orderBy);
         this.limit = limit;
+        this.offset = offset;
         this.bindCount = bindCount;
         this.isAggregate = isAggregate || groupBy.size() != 
countConstants(groupBy) || this.having != null;
         this.hasSequence = hasSequence;
@@ -343,4 +350,10 @@ public class SelectStatement implements 
FilterableStatement {
     public Map<String, UDFParseNode> getUdfParseNodes() {
         return udfParseNodes;
     }
+
+    @Override
+    public OffsetNode getOffset() {
+        return offset;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/776eea9c/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 471ac73..6def7db 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -138,6 +138,9 @@ public interface QueryConstants {
     public final static String SYSTEM_SCHEMA_NAME = "SYSTEM";
     public final static byte[] SYSTEM_SCHEMA_NAME_BYTES = 
Bytes.toBytes(SYSTEM_SCHEMA_NAME);
     public final static String PHOENIX_METADATA = "table";
+    public final static String OFFSET_ROW_KEY = "_OFFSET_";
+    public final static byte[] OFFSET_ROW_KEY_BYTES = 
Bytes.toBytes(OFFSET_ROW_KEY);
+    public final static ImmutableBytesPtr offsetRowKeyPtr = new 
ImmutableBytesPtr(OFFSET_ROW_KEY_BYTES);
 
     public final static PName SINGLE_COLUMN_NAME = 
PNameFactory.newNormalizedName("s");
     public final static PName SINGLE_COLUMN_FAMILY_NAME = 
PNameFactory.newNormalizedName("s");
@@ -328,5 +331,8 @@ public interface QueryConstants {
             // Install split policy to prevent a tenant's metadata from being 
split across regions.
             HTableDescriptor.SPLIT_POLICY + "='" + 
MetaDataSplitPolicy.class.getName() + "',\n" + 
             PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE;
+    public static final byte[] OFFSET_FAMILY = "f_offset".getBytes();
+    public static final byte[] OFFSET_COLUMN = "c_offset".getBytes();
+    public static final String LAST_SCAN = "LAST_SCAN";
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/776eea9c/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
index f275f55..ded20e9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
@@ -34,17 +34,21 @@ import javax.annotation.Nullable;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.util.Addressing;
 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
 import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.parse.HintNode.Hint;
 import org.apache.phoenix.parse.WildcardParseNode;
+import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.tuple.Tuple;
 
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
@@ -81,6 +85,7 @@ public final class QueryUtil {
     private static final String FROM = "FROM";
     private static final String WHERE = "WHERE";
     private static final String[] CompareOpString = new 
String[CompareOp.values().length];
+
     static {
         CompareOpString[CompareOp.EQUAL.ordinal()] = "=";
         CompareOpString[CompareOp.NOT_EQUAL.ordinal()] = "!=";
@@ -352,5 +357,28 @@ public final class QueryUtil {
                 ("\"" + tableName + "\" ") +
                 (WHERE + " " + where);
     }
-    
+
+    public static Integer getOffsetLimit(Integer limit, Integer offset) {
+        if (limit == null) {
+            return null;
+        } else if (offset == null) {
+            return limit;
+        } else {
+            return limit + offset;
+        }
+
+    }
+
+    public static byte[] getUnusedOffset(Tuple offsetTuple) {
+        if (offsetTuple != null) {
+            ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr();
+            offsetTuple.getKey(rowKeyPtr);
+            if (QueryConstants.offsetRowKeyPtr.compareTo(rowKeyPtr) == 0) {
+                Cell value = 
offsetTuple.getValue(QueryConstants.OFFSET_FAMILY, 
QueryConstants.OFFSET_COLUMN);
+                return value.getValue();
+            }
+        }
+        return null;
+    }
+ 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/776eea9c/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 360db2c..46589b9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -804,4 +804,8 @@ public class ScanUtil {
         return 
context.getConnection().getQueryServices().isRenewingLeasesEnabled();
     }
 
+    public static void addOffsetAttribute(Scan scan, Integer offset) {
+        scan.setAttribute(BaseScannerRegionObserver.SCAN_OFFSET, 
Bytes.toBytes(offset));
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/776eea9c/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
index a8757ab..6d0930d 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
@@ -172,21 +172,43 @@ public class CorrelatePlanTest {
                 {2, "2", "2", 20},
                 {5, "5", "5", 100},
         };
-        testCorrelatePlan(LEFT_RELATION, rightRelation, 1, 0, JoinType.Inner, 
expected);        
+        testCorrelatePlan(LEFT_RELATION, rightRelation, 1, 0, JoinType.Inner, 
expected);
     }
     
-    private void testCorrelatePlan(Object[][] leftRelation, Object[][] 
rightRelation, int leftCorrelColumn, int rightCorrelColumn, JoinType type, 
Object[][] expectedResult) throws SQLException {        
+    @Test
+    public void testCorrelatePlanWithSingleValueOnlyAndOffset() throws 
SQLException {
+        Integer offset = 1;
+        Object[][] rightRelation = new Object[][] {
+                {"6", 60},
+                {"2", 20},
+                {"5", 100},
+                {"1", 10},
+        };
+        Object[][] expected = new Object[][] {
+                {2, "2", "2", 20},
+                {5, "5", "5", 100},
+        };
+        testCorrelatePlan(LEFT_RELATION, rightRelation, 1, 0, JoinType.Inner, 
expected, offset);
+    }
+
+    private void testCorrelatePlan(Object[][] leftRelation, Object[][] 
rightRelation, int leftCorrelColumn, int rightCorrelColumn, JoinType type, 
Object[][] expectedResult) throws SQLException {
+        testCorrelatePlan(leftRelation, rightRelation, leftCorrelColumn, 
rightCorrelColumn, type, expectedResult, null);
+    }
+
+    private void testCorrelatePlan(Object[][] leftRelation, Object[][] 
rightRelation, int leftCorrelColumn,
+            int rightCorrelColumn, JoinType type, Object[][] expectedResult, 
Integer offset) throws SQLException {
         TableRef leftTable = createProjectedTableFromLiterals(leftRelation[0]);
         TableRef rightTable = 
createProjectedTableFromLiterals(rightRelation[0]);
         String varName = "$cor0";
         RuntimeContext runtimeContext = new RuntimeContextImpl();
         runtimeContext.defineCorrelateVariable(varName, leftTable);
-        QueryPlan leftPlan = newLiteralResultIterationPlan(leftRelation);
-        QueryPlan rightPlan = newLiteralResultIterationPlan(rightRelation);
+        QueryPlan leftPlan = newLiteralResultIterationPlan(leftRelation, 
offset);
+        QueryPlan rightPlan = newLiteralResultIterationPlan(rightRelation, 
offset);
         Expression columnExpr = new ColumnRef(rightTable, 
rightCorrelColumn).newColumnExpression();
         Expression fieldAccess = new 
CorrelateVariableFieldAccessExpression(runtimeContext, varName, new 
ColumnRef(leftTable, leftCorrelColumn).newColumnExpression());
         Expression filter = ComparisonExpression.create(CompareOp.EQUAL, 
Arrays.asList(columnExpr, fieldAccess), CONTEXT.getTempPtr(), false);
-        rightPlan = new ClientScanPlan(CONTEXT, SelectStatement.SELECT_ONE, 
rightTable, RowProjector.EMPTY_PROJECTOR, null, filter, OrderBy.EMPTY_ORDER_BY, 
rightPlan);
+        rightPlan = new ClientScanPlan(CONTEXT, SelectStatement.SELECT_ONE, 
rightTable, RowProjector.EMPTY_PROJECTOR,
+                null, null, filter, OrderBy.EMPTY_ORDER_BY, rightPlan);
         PTable joinedTable = 
JoinCompiler.joinProjectedTables(leftTable.getTable(), rightTable.getTable(), 
type);
         CorrelatePlan correlatePlan = new CorrelatePlan(leftPlan, rightPlan, 
varName, type, false, runtimeContext, joinedTable, leftTable.getTable(), 
rightTable.getTable(), leftTable.getTable().getColumns().size());
         ResultIterator iter = correlatePlan.iterator();
@@ -202,8 +224,8 @@ public class CorrelatePlanTest {
             }
         }
     }
-    
-    private QueryPlan newLiteralResultIterationPlan(Object[][] rows) {
+
+    private QueryPlan newLiteralResultIterationPlan(Object[][] rows, Integer 
offset) {
         List<Tuple> tuples = Lists.newArrayList();
         Tuple baseTuple = new SingleKeyValueTuple(KeyValue.LOWESTKEY);
         for (Object[] row : rows) {
@@ -215,7 +237,8 @@ public class CorrelatePlanTest {
             tuples.add(projector.projectResults(baseTuple));
         }
         
-        return new LiteralResultIterationPlan(tuples, CONTEXT, 
SelectStatement.SELECT_ONE, TableRef.EMPTY_TABLE_REF, 
RowProjector.EMPTY_PROJECTOR, null, OrderBy.EMPTY_ORDER_BY, null);
+        return new LiteralResultIterationPlan(tuples, CONTEXT, 
SelectStatement.SELECT_ONE, TableRef.EMPTY_TABLE_REF,
+                RowProjector.EMPTY_PROJECTOR, null, offset, 
OrderBy.EMPTY_ORDER_BY, null);
     }
 
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/776eea9c/phoenix-core/src/test/java/org/apache/phoenix/execute/LiteralResultIteratorPlanTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/execute/LiteralResultIteratorPlanTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/execute/LiteralResultIteratorPlanTest.java
new file mode 100644
index 0000000..8ecf0ee
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/execute/LiteralResultIteratorPlanTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import static org.apache.phoenix.util.PhoenixRuntime.CONNECTIONLESS;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.compile.ColumnResolver;
+import org.apache.phoenix.compile.FromCompiler;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.SequenceManager;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.compile.TupleProjectionCompiler;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.expression.ProjectedColumnExpression;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.parse.ParseNodeFactory;
+import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.schema.ColumnRef;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnImpl;
+import org.apache.phoenix.schema.PName;
+import org.apache.phoenix.schema.PNameFactory;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class LiteralResultIteratorPlanTest {
+
+    private static final StatementContext CONTEXT;
+
+    static {
+        try {
+            PhoenixConnection connection = DriverManager
+                    .getConnection(JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + 
CONNECTIONLESS)
+                    .unwrap(PhoenixConnection.class);
+            PhoenixStatement stmt = new PhoenixStatement(connection);
+            ColumnResolver resolver = 
FromCompiler.getResolverForQuery(SelectStatement.SELECT_ONE, connection);
+            CONTEXT = new StatementContext(stmt, resolver, new Scan(), new 
SequenceManager(stmt));
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static final Object[][] RELATION = new Object[][] {
+        {"2", 20},
+        {"2", 40},
+        {"5", 50},
+        {"6", 60},
+        {"5", 100},
+        {"1", 10},
+        {"3", 30},
+ };
+    PTable table = createProjectedTableFromLiterals(RELATION[0]).getTable();
+
+    @Test
+    public void testLiteralResultIteratorPlanWithOffset() throws SQLException {
+        Object[][] expected = new Object[][] {
+            {"2", 40},
+            {"5", 50},
+            {"6", 60},
+            {"5", 100},
+            {"1", 10},
+            {"3", 30},
+        };
+        testLiteralResultIteratorPlan(expected, 1, null);
+    }
+
+    @Test
+    public void testLiteralResultIteratorPlanWithLimit() throws SQLException {
+        Object[][] expected = new Object[][] {
+            {"2", 20},
+            {"2", 40},
+            {"5", 50},
+            {"6", 60},
+        };
+        testLiteralResultIteratorPlan(expected, null, 4);
+    }
+
+    @Test
+    public void testLiteralResultIteratorPlanWithLimitAndOffset() throws 
SQLException {
+        Object[][] expected = new Object[][] {
+            {"5", 50},
+            {"6", 60},
+            {"5", 100},
+            {"1", 10},
+            };
+        testLiteralResultIteratorPlan(expected, 2, 4);
+    }
+
+    private void testLiteralResultIteratorPlan(Object[][] expectedResult, 
Integer offset, Integer limit)
+            throws SQLException {
+
+        QueryPlan plan = newLiteralResultIterationPlan(offset, limit);
+        ResultIterator iter = plan.iterator();
+        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+        for (Object[] row : expectedResult) {
+            Tuple next = iter.next();
+            assertNotNull(next);
+            for (int i = 0; i < row.length; i++) {
+                PColumn column = table.getColumns().get(i);
+                boolean eval = new ProjectedColumnExpression(column, table, 
column.getName().getString()).evaluate(next,
+                        ptr);
+                Object o = eval ? column.getDataType().toObject(ptr) : null;
+                assertEquals(row[i], o);
+            }
+        }
+        assertNull(iter.next());
+    }
+
+    private QueryPlan newLiteralResultIterationPlan(Integer offset, Integer 
limit) {
+        List<Tuple> tuples = Lists.newArrayList();
+
+        Tuple baseTuple = new SingleKeyValueTuple(KeyValue.LOWESTKEY);
+        for (Object[] row : RELATION) {
+            Expression[] exprs = new Expression[row.length];
+            for (int i = 0; i < row.length; i++) {
+                exprs[i] = LiteralExpression.newConstant(row[i]);
+            }
+            TupleProjector projector = new TupleProjector(exprs);
+            tuples.add(projector.projectResults(baseTuple));
+        }
+
+        return new LiteralResultIterationPlan(tuples, CONTEXT, 
SelectStatement.SELECT_ONE, TableRef.EMPTY_TABLE_REF,
+                RowProjector.EMPTY_PROJECTOR, limit, offset, 
OrderBy.EMPTY_ORDER_BY, null);
+    }
+
+    private TableRef createProjectedTableFromLiterals(Object[] row) {
+        List<PColumn> columns = Lists.<PColumn> newArrayList();
+        for (int i = 0; i < row.length; i++) {
+            String name = ParseNodeFactory.createTempAlias();
+            Expression expr = LiteralExpression.newConstant(row[i]);
+            columns.add(new PColumnImpl(PNameFactory.newName(name),
+                    PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), 
expr.getDataType(), expr.getMaxLength(),
+                    expr.getScale(), expr.isNullable(), i, 
expr.getSortOrder(), null, null, false, name, false, false));
+        }
+        try {
+            PTable pTable = PTableImpl.makePTable(null, PName.EMPTY_NAME, 
PName.EMPTY_NAME, PTableType.SUBQUERY, null,
+                    MetaDataProtocol.MIN_TABLE_TIMESTAMP, 
PTable.INITIAL_SEQ_NUM, null, null, columns, null, null,
+                    Collections.<PTable> emptyList(), false, 
Collections.<PName> emptyList(), null, null, false, false,
+                    false, null, null, null, true, false, 0, 0L);
+            TableRef sourceTable = new TableRef(pTable);
+            List<ColumnRef> sourceColumnRefs = Lists.<ColumnRef> 
newArrayList();
+            for (PColumn column : sourceTable.getTable().getColumns()) {
+                sourceColumnRefs.add(new ColumnRef(sourceTable, 
column.getPosition()));
+            }
+
+            return new 
TableRef(TupleProjectionCompiler.createProjectedTable(sourceTable, 
sourceColumnRefs, false));
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/776eea9c/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java
index 8b2b096..51b8868 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java
@@ -112,7 +112,8 @@ public class UnnestArrayPlanTest {
     private void testUnnestArrays(PArrayDataType arrayType, List<Object[]> 
arrays, boolean withOrdinality) throws Exception {
         PDataType baseType = PDataType.fromTypeId(arrayType.getSqlType() - 
PDataType.ARRAY_TYPE_BASE);
         List<Tuple> tuples = toTuples(arrayType, arrays);
-        LiteralResultIterationPlan subPlan = new 
LiteralResultIterationPlan(tuples, CONTEXT, SelectStatement.SELECT_ONE, 
TableRef.EMPTY_TABLE_REF, RowProjector.EMPTY_PROJECTOR, null, 
OrderBy.EMPTY_ORDER_BY, null);
+               LiteralResultIterationPlan subPlan = new 
LiteralResultIterationPlan(tuples, CONTEXT, SelectStatement.SELECT_ONE,
+                               TableRef.EMPTY_TABLE_REF, 
RowProjector.EMPTY_PROJECTOR, null, null, OrderBy.EMPTY_ORDER_BY, null);
         LiteralExpression dummy = LiteralExpression.newConstant(null, 
arrayType);
         RowKeyValueAccessor accessor = new 
RowKeyValueAccessor(Arrays.asList(dummy), 0);
         UnnestArrayPlan plan = new UnnestArrayPlan(subPlan, new 
RowKeyColumnExpression(dummy, accessor), withOrdinality);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/776eea9c/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
index 51073dd..ff06ff9 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
@@ -407,6 +407,11 @@ public class ParallelIteratorsSplitTest extends 
BaseConnectionlessQueryTest {
             }
 
             @Override
+            public Integer getOffset() {
+                return null;
+            }
+
+            @Override
             public OrderBy getOrderBy() {
                 return OrderBy.EMPTY_ORDER_BY;
             }

Reply via email to