PHOENIX-1763 Support building with HBase-1.1.0

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/98271b88
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/98271b88
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/98271b88

Branch: refs/heads/4.x-HBase-1.1
Commit: 98271b888c113f10e174205434e05d3b36b7eb67
Parents: bf01eb2
Author: Enis Soztutar <e...@apache.org>
Authored: Thu May 21 23:08:26 2015 -0700
Committer: Enis Soztutar <e...@apache.org>
Committed: Fri May 22 00:30:56 2015 -0700

----------------------------------------------------------------------
 phoenix-core/pom.xml                            | 17 +++--
 .../regionserver/IndexHalfStoreFileReader.java  | 31 ++++++--
 .../regionserver/IndexSplitTransaction.java     | 39 ++++++++--
 .../hbase/regionserver/LocalIndexMerger.java    |  3 +-
 .../cache/aggcache/SpillableGroupByCache.java   | 13 +++-
 .../phoenix/coprocessor/BaseRegionScanner.java  | 12 +--
 .../coprocessor/BaseScannerRegionObserver.java  | 77 +++++++++++---------
 .../coprocessor/DelegateRegionScanner.java      | 23 ++++--
 .../GroupedAggregateRegionObserver.java         | 53 ++++++++------
 .../coprocessor/HashJoinRegionScanner.java      | 60 ++++++++-------
 .../coprocessor/MetaDataRegionObserver.java     | 23 +++---
 .../phoenix/coprocessor/ScanRegionObserver.java | 11 ++-
 .../UngroupedAggregateRegionObserver.java       | 55 +++++++-------
 .../hbase/index/covered/data/LocalTable.java    |  2 +-
 .../index/covered/filter/FamilyOnlyFilter.java  |  6 +-
 .../index/scanner/FilteredKeyValueScanner.java  |  2 +-
 .../phoenix/index/PhoenixIndexBuilder.java      |  6 +-
 .../iterate/RegionScannerResultIterator.java    |  9 ++-
 .../phoenix/schema/stats/StatisticsScanner.java | 10 ++-
 .../hbase/ipc/PhoenixIndexRpcSchedulerTest.java |  6 +-
 .../index/covered/TestLocalTableState.java      |  1 -
 .../covered/filter/TestFamilyOnlyFilter.java    | 12 +--
 .../index/write/TestWALRecoveryCaching.java     |  4 +-
 phoenix-flume/pom.xml                           |  9 ---
 phoenix-pig/pom.xml                             | 31 +++++---
 phoenix-spark/pom.xml                           |  7 ++
 pom.xml                                         | 41 ++++++++++-
 27 files changed, 361 insertions(+), 202 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index 45b8d73..22e6b60 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -350,16 +350,25 @@
     <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-it</artifactId>
-      <version>${hbase.version}</version>
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-annotations</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-common</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-common</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-protocol</artifactId>
     </dependency>
     <dependency>
@@ -369,18 +378,16 @@
     <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-server</artifactId>
-      <version>${hbase.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-server</artifactId>
-      <version>${hbase.version}</version>
       <type>test-jar</type>
+      <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-hadoop-compat</artifactId>
-      <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.hbase</groupId>
@@ -391,13 +398,11 @@
     <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-hadoop2-compat</artifactId>
-      <version>${hbase.version}</version>
       <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-hadoop2-compat</artifactId>
-      <version>${hbase.version}</version>
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java
 
b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java
index 49e2022..9befc8c 100644
--- 
a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java
+++ 
b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java
@@ -47,11 +47,11 @@ import org.apache.phoenix.index.IndexMaintainer;
  * that sort lowest and 'top' is the second half of the file with keys that 
sort greater than those
  * of the bottom half. The top includes the split files midkey, of the key 
that follows if it does
  * not exist in the file.
- * 
+ *
  * <p>
  * This type works in tandem with the {@link Reference} type. This class is 
used reading while
  * Reference is used writing.
- * 
+ *
  * <p>
  * This file is not splitable. Calls to {@link #midkey()} return null.
  */
@@ -64,7 +64,7 @@ public class IndexHalfStoreFileReader extends 
StoreFile.Reader {
     private final byte[] splitkey;
     private final byte[] splitRow;
     private final Map<ImmutableBytesWritable, IndexMaintainer> 
indexMaintainers;
-    private final byte[][] viewConstants; 
+    private final byte[][] viewConstants;
     private final int offset;
     private final HRegionInfo regionInfo;
     private final byte[] regionStartKeyInHFile;
@@ -144,6 +144,7 @@ public class IndexHalfStoreFileReader extends 
StoreFile.Reader {
             final HFileScanner delegate = s;
             public boolean atEnd = false;
 
+            @Override
             public ByteBuffer getKey() {
                 if (atEnd) {
                     return null;
@@ -160,7 +161,7 @@ public class IndexHalfStoreFileReader extends 
StoreFile.Reader {
                 // If it is top store file replace the StartKey of the Key 
with SplitKey
                 return getChangedKey(delegate.getKeyValue(), changeBottomKeys);
             }
-            
+
             private ByteBuffer getChangedKey(Cell kv, boolean 
changeBottomKeys) {
                 // new KeyValue(row, family, qualifier, timestamp, type, value)
                 byte[] newRowkey = 
getNewRowkeyByRegionStartKeyReplacedWithSplitKey(kv, changeBottomKeys);
@@ -183,6 +184,7 @@ public class IndexHalfStoreFileReader extends 
StoreFile.Reader {
                 return keyReplacedStartKey;
             }
 
+            @Override
             public String getKeyString() {
                 if (atEnd) {
                     return null;
@@ -190,6 +192,7 @@ public class IndexHalfStoreFileReader extends 
StoreFile.Reader {
                 return Bytes.toStringBinary(getKey());
             }
 
+            @Override
             public ByteBuffer getValue() {
                 if (atEnd) {
                     return null;
@@ -197,6 +200,7 @@ public class IndexHalfStoreFileReader extends 
StoreFile.Reader {
                 return delegate.getValue();
             }
 
+            @Override
             public String getValueString() {
                 if (atEnd) {
                     return null;
@@ -204,6 +208,7 @@ public class IndexHalfStoreFileReader extends 
StoreFile.Reader {
                 return Bytes.toStringBinary(getValue());
             }
 
+            @Override
             public Cell getKeyValue() {
                 if (atEnd) {
                     return null;
@@ -227,6 +232,7 @@ public class IndexHalfStoreFileReader extends 
StoreFile.Reader {
                 return changedKv;
             }
 
+            @Override
             public boolean next() throws IOException {
                 if (atEnd) {
                     return false;
@@ -248,10 +254,12 @@ public class IndexHalfStoreFileReader extends 
StoreFile.Reader {
                 }
             }
 
+            @Override
             public boolean seekBefore(byte[] key) throws IOException {
                 return seekBefore(key, 0, key.length);
             }
 
+            @Override
             public boolean seekBefore(byte[] key, int offset, int length) 
throws IOException {
 
                 if (top) {
@@ -282,6 +290,7 @@ public class IndexHalfStoreFileReader extends 
StoreFile.Reader {
                 return seekBefore(kv.getBuffer(), kv.getKeyOffset(), 
kv.getKeyLength());
             }
 
+            @Override
             public boolean seekTo() throws IOException {
                 boolean b = delegate.seekTo();
                 if (!b) {
@@ -302,10 +311,12 @@ public class IndexHalfStoreFileReader extends 
StoreFile.Reader {
                 }
             }
 
+            @Override
             public int seekTo(byte[] key) throws IOException {
                 return seekTo(key, 0, key.length);
             }
 
+            @Override
             public int seekTo(byte[] key, int offset, int length) throws 
IOException {
                 if (top) {
                     if (getComparator().compare(key, offset, length, splitkey, 
0, splitkey.length) < 0) {
@@ -342,10 +353,12 @@ public class IndexHalfStoreFileReader extends 
StoreFile.Reader {
                 return seekTo(kv.getBuffer(), kv.getKeyOffset(), 
kv.getKeyLength());
             }
 
+            @Override
             public int reseekTo(byte[] key) throws IOException {
                 return reseekTo(key, 0, key.length);
             }
 
+            @Override
             public int reseekTo(byte[] key, int offset, int length) throws 
IOException {
                 if (top) {
                     if (getComparator().compare(key, offset, length, splitkey, 
0, splitkey.length) < 0) {
@@ -375,11 +388,13 @@ public class IndexHalfStoreFileReader extends 
StoreFile.Reader {
                 return reseekTo(kv.getBuffer(), kv.getKeyOffset(), 
kv.getKeyLength());
             }
 
+            @Override
             public org.apache.hadoop.hbase.io.hfile.HFile.Reader getReader() {
                 return this.delegate.getReader();
             }
 
             // TODO: Need to change as per IndexHalfStoreFileReader
+            @Override
             public boolean isSeeked() {
                 return this.delegate.isSeeked();
             }
@@ -425,13 +440,13 @@ public class IndexHalfStoreFileReader extends 
StoreFile.Reader {
     /**
      * In case of top half store, the passed key will be with the start key of 
the daughter region.
      * But in the actual HFiles, the key will be with the start key of the old 
parent region. In
-     * order to make the real seek in the HFiles, we need to build the old 
key. 
-     * 
+     * order to make the real seek in the HFiles, we need to build the old key.
+     *
      * The logic here is just replace daughter region start key with parent 
region start key
      * in the key part.
-     * 
+     *
      * @param key
-     * 
+     *
      */
     private KeyValue getKeyPresentInHFiles(byte[] key) {
         KeyValue keyValue = new KeyValue(key);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java
 
b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java
index 920380b..3057a14 100644
--- 
a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java
+++ 
b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java
@@ -165,6 +165,7 @@ public class IndexSplitTransaction extends SplitTransaction 
{
    * @return <code>true</code> if the region is splittable else
    * <code>false</code> if it is not (e.g. its already closed, etc.).
    */
+  @Override
   public boolean prepare() {
     if (!this.parent.isSplittable()) return false;
     // Split key can be null if this region is unsplittable; i.e. has refs.
@@ -215,6 +216,7 @@ public class IndexSplitTransaction extends SplitTransaction 
{
    *    Call {@link #rollback(Server, RegionServerServices)}
    * @return Regions created
    */
+  @Override
   /* package */PairOfSameType<HRegion> createDaughters(final Server server,
       final RegionServerServices services) throws IOException {
     LOG.info("Starting split of region " + this.parent);
@@ -288,16 +290,19 @@ public class IndexSplitTransaction extends 
SplitTransaction {
       if (metaEntries == null || metaEntries.isEmpty()) {
         MetaTableAccessor.splitRegion(server.getConnection(), 
parent.getRegionInfo(),
                 daughterRegions.getFirst().getRegionInfo(),
-                daughterRegions.getSecond().getRegionInfo(), 
server.getServerName());
+                daughterRegions.getSecond().getRegionInfo(), 
server.getServerName(),
+                parent.getTableDesc().getRegionReplication());
       } else {
         offlineParentInMetaAndputMetaEntries(server.getConnection(),
           parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(), 
daughterRegions
-              .getSecond().getRegionInfo(), server.getServerName(), 
metaEntries);
+              .getSecond().getRegionInfo(), server.getServerName(), 
metaEntries,
+              parent.getTableDesc().getRegionReplication());
       }
     }
     return daughterRegions;
   }
 
+  @Override
   public PairOfSameType<HRegion> stepsBeforePONR(final Server server,
       final RegionServerServices services, boolean testing) throws IOException 
{
     // Set ephemeral SPLITTING znode up in zk.  Mocked servers sometimes don't
@@ -380,6 +385,7 @@ public class IndexSplitTransaction extends SplitTransaction 
{
    * @throws IOException If thrown, transaction failed.
    *          Call {@link #rollback(Server, RegionServerServices)}
    */
+  @Override
   /* package */void openDaughters(final Server server,
       final RegionServerServices services, HRegion a, HRegion b)
       throws IOException {
@@ -565,6 +571,7 @@ public class IndexSplitTransaction extends SplitTransaction 
{
    * @throws IOException
    * @see #rollback(Server, RegionServerServices)
    */
+  @Override
   public PairOfSameType<HRegion> execute(final Server server,
       final RegionServerServices services)
   throws IOException {
@@ -575,6 +582,7 @@ public class IndexSplitTransaction extends SplitTransaction 
{
     return stepsAfterPONR(server, services, regions);
   }
 
+  @Override
   public PairOfSameType<HRegion> stepsAfterPONR(final Server server,
       final RegionServerServices services, PairOfSameType<HRegion> regions)
       throws IOException {
@@ -585,7 +593,7 @@ public class IndexSplitTransaction extends SplitTransaction 
{
 
   private void offlineParentInMetaAndputMetaEntries(Connection conn,
       HRegionInfo parent, HRegionInfo splitA, HRegionInfo splitB,
-      ServerName serverName, List<Mutation> metaEntries) throws IOException {
+      ServerName serverName, List<Mutation> metaEntries, int 
regionReplication) throws IOException {
     List<Mutation> mutations = metaEntries;
     HRegionInfo copyOfParent = new HRegionInfo(parent);
     copyOfParent.setOffline(true);
@@ -595,7 +603,7 @@ public class IndexSplitTransaction extends SplitTransaction 
{
     Put putParent = MetaTableAccessor.makePutFromRegionInfo(copyOfParent);
     MetaTableAccessor.addDaughtersToPut(putParent, splitA, splitB);
     mutations.add(putParent);
-    
+
     //Puts for daughters
     Put putA = MetaTableAccessor.makePutFromRegionInfo(splitA);
     Put putB = MetaTableAccessor.makePutFromRegionInfo(splitB);
@@ -604,9 +612,18 @@ public class IndexSplitTransaction extends 
SplitTransaction {
     addLocation(putB, serverName, 1);
     mutations.add(putA);
     mutations.add(putB);
+
+    // Add empty locations for region replicas of daughters so that number of 
replicas can be
+    // cached whenever the primary region is looked up from meta
+    for (int i = 1; i < regionReplication; i++) {
+      addEmptyLocation(putA, i);
+      addEmptyLocation(putB, i);
+    }
+
     MetaTableAccessor.mutateMetaTable(conn, mutations);
   }
 
+  @Override
   public Put addLocation(final Put p, final ServerName sn, long openSeqNum) {
     p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
       Bytes.toBytes(sn.getHostAndPort()));
@@ -617,6 +634,13 @@ public class IndexSplitTransaction extends 
SplitTransaction {
     return p;
   }
 
+  private static Put addEmptyLocation(final Put p, int replicaId){
+    p.addImmutable(HConstants.CATALOG_FAMILY, 
MetaTableAccessor.getServerColumn(replicaId), null);
+    p.addImmutable(HConstants.CATALOG_FAMILY, 
MetaTableAccessor.getStartCodeColumn(replicaId), null);
+    p.addImmutable(HConstants.CATALOG_FAMILY, 
MetaTableAccessor.getSeqNumColumn(replicaId), null);
+    return p;
+  }
+
   /*
    * Open daughter region in its own thread.
    * If we fail, abort this hosting server.
@@ -659,6 +683,7 @@ public class IndexSplitTransaction extends SplitTransaction 
{
    * @throws IOException
    * @throws KeeperException
    */
+  @Override
   void openDaughterRegion(final Server server, final HRegion daughter)
   throws IOException, KeeperException {
     HRegionInfo hri = daughter.getRegionInfo();
@@ -767,6 +792,7 @@ public class IndexSplitTransaction extends SplitTransaction 
{
       this.family = family;
     }
 
+    @Override
     public Void call() throws IOException {
       splitStoreFile(family, sf);
       return null;
@@ -807,6 +833,7 @@ public class IndexSplitTransaction extends SplitTransaction 
{
    * @return True if we successfully rolled back, false if we got to the point
    * of no return and so now need to abort the server to minimize damage.
    */
+  @Override
   @SuppressWarnings("deprecation")
   public boolean rollback(final Server server, final RegionServerServices 
services)
   throws IOException {
@@ -879,10 +906,12 @@ public class IndexSplitTransaction extends 
SplitTransaction {
     return result;
   }
 
+  @Override
   HRegionInfo getFirstDaughter() {
     return hri_a;
   }
 
+  @Override
   HRegionInfo getSecondDaughter() {
     return hri_b;
   }
@@ -971,7 +1000,7 @@ public class IndexSplitTransaction extends 
SplitTransaction {
     return ZKAssign.transitionNode(zkw, parent, serverName,
       beginState, endState, znodeVersion, payload);
   }
-  
+
   public HRegion getParent() {
     return this.parent;
   }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexMerger.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexMerger.java
 
b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexMerger.java
index f074df7..add9b72 100644
--- 
a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexMerger.java
+++ 
b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexMerger.java
@@ -81,7 +81,8 @@ public class LocalIndexMerger extends 
BaseRegionServerObserver {
                 this.mergedRegion = rmt.stepsBeforePONR(rss, rss, false);
                 rmt.prepareMutationsForMerge(mergedRegion.getRegionInfo(),
                     indexRegionA.getRegionInfo(), indexRegionB.getRegionInfo(),
-                    rss.getServerName(), metaEntries);
+                    rss.getServerName(), metaEntries,
+                    mergedRegion.getTableDesc().getRegionReplication());
             } catch (Exception e) {
                 ctx.bypass();
                 LOG.warn("index regions merge failed with the exception ", e);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
index ce18cc2..69fc6f6 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
@@ -120,7 +120,7 @@ public class SpillableGroupByCache implements GroupByCache {
 
     /**
      * Instantiates a Loading LRU Cache that stores key / aggregator[] tuples 
used for group by queries
-     * 
+     *
      * @param estSize
      * @param estValueSize
      * @param aggs
@@ -325,7 +325,7 @@ public class SpillableGroupByCache implements GroupByCache {
 
     /**
      * Closes cache and releases spill resources
-     * 
+     *
      * @throws IOException
      */
     @Override
@@ -358,7 +358,9 @@ public class SpillableGroupByCache implements GroupByCache {
 
             @Override
             public boolean next(List<Cell> results) throws IOException {
-                if (!cacheIter.hasNext()) { return false; }
+                if (!cacheIter.hasNext()) {
+                    return false;
+                }
                 Map.Entry<ImmutableBytesWritable, Aggregator[]> ce = 
cacheIter.next();
                 ImmutableBytesWritable key = ce.getKey();
                 Aggregator[] aggs = ce.getValue();
@@ -377,6 +379,11 @@ public class SpillableGroupByCache implements GroupByCache 
{
             public long getMaxResultSize() {
               return s.getMaxResultSize();
             }
+
+            @Override
+            public int getBatch() {
+                return s.getBatch();
+            }
         };
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
index ff9ac76..828f776 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
@@ -22,14 +22,14 @@ import java.util.List;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
 
 public abstract class BaseRegionScanner implements RegionScanner {
 
     @Override
     public boolean isFilterDone() {
-        return false; 
+        return false;
     }
 
     @Override
@@ -38,10 +38,10 @@ public abstract class BaseRegionScanner implements 
RegionScanner {
     }
 
     @Override
-    public boolean next(List<Cell> result, int limit) throws IOException {
+    public boolean next(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
         return next(result);
     }
-    
+
     @Override
     public boolean reseek(byte[] row) throws IOException {
         throw new DoNotRetryIOException("Unsupported");
@@ -58,7 +58,7 @@ public abstract class BaseRegionScanner implements 
RegionScanner {
     }
 
     @Override
-    public boolean nextRaw(List<Cell> result, int limit) throws IOException {
-        return next(result, limit);
+    public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
+        return next(result, scannerContext);
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index a2269b4..fc74968 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -37,6 +37,7 @@ import 
org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
@@ -60,7 +61,7 @@ import com.google.common.collect.ImmutableList;
 
 
 abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
-    
+
     public static final String AGGREGATORS = "_Aggs";
     public static final String UNORDERED_GROUP_BY_EXPRESSIONS = 
"_UnorderedGroupByExpressions";
     public static final String KEY_ORDERED_GROUP_BY_EXPRESSIONS = 
"_OrderedGroupByExpressions";
@@ -91,7 +92,7 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
      * Attribute name used to pass custom annotations in Scans and Mutations 
(later). Custom annotations
      * are used to augment log lines emitted by Phoenix. See 
https://issues.apache.org/jira/browse/PHOENIX-1198.
      */
-    public static final String CUSTOM_ANNOTATIONS = "_Annot"; 
+    public static final String CUSTOM_ANNOTATIONS = "_Annot";
 
     /** Exposed for testing */
     public static final String SCANNER_OPENED_TRACE_INFO = "Scanner opened on 
server";
@@ -111,8 +112,8 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
     public String toString() {
         return this.getClass().getName();
     }
-    
-    
+
+
     private static void throwIfScanOutOfRegion(Scan scan, HRegion region) 
throws DoNotRetryIOException {
         boolean isLocalIndex = ScanUtil.isLocalIndex(scan);
         byte[] lowerInclusiveScanKey = scan.getStartRow();
@@ -136,7 +137,7 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
 
     abstract protected boolean isRegionObserverFor(Scan scan);
     abstract protected RegionScanner 
doPostScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, final Scan 
scan, final RegionScanner s) throws Throwable;
-    
+
     @Override
     public RegionScanner preScannerOpen(final 
ObserverContext<RegionCoprocessorEnvironment> c,
         final Scan scan, final RegionScanner s) throws IOException {
@@ -153,7 +154,7 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
     /**
      * Wrapper for {@link #postScannerOpen(ObserverContext, Scan, 
RegionScanner)} that ensures no non IOException is thrown,
      * to prevent the coprocessor from becoming blacklisted.
-     * 
+     *
      */
     @Override
     public final RegionScanner postScannerOpen(
@@ -165,10 +166,10 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
             }
             boolean success =false;
             // Save the current span. When done with the child span, reset the 
span back to
-            // what it was. Otherwise, this causes the thread local storing 
the current span 
+            // what it was. Otherwise, this causes the thread local storing 
the current span
             // to not be reset back to null causing catastrophic infinite loops
             // and region servers to crash. See 
https://issues.apache.org/jira/browse/PHOENIX-1596
-            // TraceScope can't be used here because closing the scope will 
end up calling 
+            // TraceScope can't be used here because closing the scope will 
end up calling
             // currentSpan.stop() and that should happen only when we are 
closing the scanner.
             final Span savedSpan = Trace.currentSpan();
             final Span child = Trace.startSpan(SCANNER_OPENED_TRACE_INFO, 
savedSpan).getSpan();
@@ -226,7 +227,7 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
         return getWrappedScanner(c, s, null, null, offset, scan, dataColumns, 
tupleProjector,
                 dataRegion, indexMaintainer, viewConstants, null, null, 
projector, ptr);
     }
-    
+
     /**
      * Return wrapped scanner that catches unexpected exceptions (i.e. Phoenix 
bugs) and
      * re-throws as DoNotRetryIOException to prevent needless retrying hanging 
the query
@@ -246,7 +247,7 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
             final Expression[] arrayFuncRefs, final int offset, final Scan 
scan,
             final ColumnReference[] dataColumns, final TupleProjector 
tupleProjector,
             final HRegion dataRegion, final IndexMaintainer indexMaintainer,
-            final byte[][] viewConstants, final KeyValueSchema kvSchema, 
+            final byte[][] viewConstants, final KeyValueSchema kvSchema,
             final ValueBitSet kvSchemaBitSet, final TupleProjector projector,
             final ImmutableBytesWritable ptr) {
         return new RegionScanner() {
@@ -262,9 +263,9 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
             }
 
             @Override
-            public boolean next(List<Cell> result, int limit) throws 
IOException {
+            public boolean next(List<Cell> result, ScannerContext 
scannerContext) throws IOException {
                 try {
-                    return s.next(result, limit);
+                    return s.next(result, scannerContext);
                 } catch (Throwable t) {
                     
ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(),
 t);
                     return false; // impossible
@@ -324,30 +325,31 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
             }
 
             @Override
-            public boolean nextRaw(List<Cell> result, int limit) throws 
IOException {
-                try {
-                    boolean next = s.nextRaw(result, limit);
-                    if (result.size() == 0) {
-                        return next;
-                    }
-                    if (arrayFuncRefs != null && arrayFuncRefs.length > 0 && 
arrayKVRefs.size() > 0) {
-                        replaceArrayIndexElement(arrayKVRefs, arrayFuncRefs, 
result);
-                    }
-                    if ((offset > 0 || ScanUtil.isLocalIndex(scan))  && 
!ScanUtil.isAnalyzeTable(scan)) {
-                        IndexUtil.wrapResultUsingOffset(c, result, offset, 
dataColumns,
-                            tupleProjector, dataRegion, indexMaintainer, 
viewConstants, ptr);
-                    }
-                    if (projector != null) {
-                        Tuple tuple = projector.projectResults(new 
ResultTuple(Result.create(result)));
-                        result.clear();
-                        result.add(tuple.getValue(0));
-                    }
-                    // There is a scanattribute set to retrieve the specific 
array element
+            public boolean nextRaw(List<Cell> result, ScannerContext 
scannerContext)
+                throws IOException {
+              try {
+                boolean next = s.nextRaw(result, scannerContext);
+                if (result.size() == 0) {
                     return next;
-                } catch (Throwable t) {
-                    
ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(),
 t);
-                    return false; // impossible
                 }
+                if (arrayFuncRefs != null && arrayFuncRefs.length > 0 && 
arrayKVRefs.size() > 0) {
+                    replaceArrayIndexElement(arrayKVRefs, arrayFuncRefs, 
result);
+                }
+                if ((offset > 0 || ScanUtil.isLocalIndex(scan))  && 
!ScanUtil.isAnalyzeTable(scan)) {
+                    IndexUtil.wrapResultUsingOffset(c, result, offset, 
dataColumns,
+                        tupleProjector, dataRegion, indexMaintainer, 
viewConstants, ptr);
+                }
+                if (projector != null) {
+                    Tuple tuple = projector.projectResults(new 
ResultTuple(Result.create(result)));
+                    result.clear();
+                    result.add(tuple.getValue(0));
+                }
+                // There is a scanattribute set to retrieve the specific array 
element
+                return next;
+            } catch (Throwable t) {
+                
ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(),
 t);
+                return false; // impossible
+            }
             }
 
             private void replaceArrayIndexElement(final 
Set<KeyValueColumnExpression> arrayKVRefs,
@@ -387,6 +389,11 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
             public long getMaxResultSize() {
                 return s.getMaxResultSize();
             }
+
+            @Override
+            public int getBatch() {
+                return s.getBatch();
+            }
         };
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
index f88a931..43c35a8 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
@@ -22,6 +22,7 @@ import java.util.List;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
 
 public class DelegateRegionScanner implements RegionScanner {
 
@@ -56,23 +57,33 @@ public class DelegateRegionScanner implements RegionScanner 
{
         delegate.close();
     }
 
+    @Override
     public long getMaxResultSize() {
         return delegate.getMaxResultSize();
     }
 
-    public boolean next(List<Cell> arg0, int arg1) throws IOException {
-        return delegate.next(arg0, arg1);
+    @Override
+    public boolean next(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
+        return delegate.next(result, scannerContext);
     }
 
-    public boolean next(List<Cell> arg0) throws IOException {
-        return delegate.next(arg0);
+    @Override
+    public boolean next(List<Cell> result) throws IOException {
+        return delegate.next(result);
     }
 
-    public boolean nextRaw(List<Cell> arg0, int arg1) throws IOException {
-        return delegate.nextRaw(arg0, arg1);
+    @Override
+    public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
+        return delegate.nextRaw(result, scannerContext);
     }
 
+    @Override
     public boolean nextRaw(List<Cell> arg0) throws IOException {
         return delegate.nextRaw(arg0);
     }
+
+    @Override
+    public int getBatch() {
+        return delegate.getBatch();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index 1f1ba36..19a1663 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -80,7 +80,7 @@ import com.google.common.collect.Maps;
 
 /**
  * Region observer that aggregates grouped rows (i.e. SQL query with GROUP BY 
clause)
- * 
+ *
  * @since 0.1
  */
 public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
@@ -116,7 +116,7 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
             offset = region.getStartKey().length != 0 ? 
region.getStartKey().length:region.getEndKey().length;
             ScanUtil.setRowKeyOffset(scan, offset);
         }
-        
+
         List<Expression> expressions = 
deserializeGroupByExpressions(expressionBytes, 0);
         ServerAggregators aggregators =
                 ServerAggregators.deserialize(scan
@@ -124,7 +124,7 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
                         .getEnvironment().getConfiguration());
 
         RegionScanner innerScanner = s;
-        
+
         byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
         List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? 
null : IndexMaintainer.deserialize(localIndexBytes);
         TupleProjector tupleProjector = null;
@@ -142,9 +142,9 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
             }
             ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
             innerScanner =
-                    getWrappedScanner(c, innerScanner, offset, scan, 
dataColumns, tupleProjector, 
+                    getWrappedScanner(c, innerScanner, offset, scan, 
dataColumns, tupleProjector,
                             dataRegion, indexMaintainers == null ? null : 
indexMaintainers.get(0), viewConstants, p, tempPtr);
-        } 
+        }
 
         if (j != null) {
             innerScanner =
@@ -223,13 +223,13 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
     }
 
     /**
-     * 
+     *
      * Cache for distinct values and their aggregations which is completely
      * in-memory (as opposed to spilling to disk). Used when 
GROUPBY_SPILLABLE_ATTRIB
      * is set to false. The memory usage is tracked at a coursed grain and will
      * throw and abort if too much is used.
      *
-     * 
+     *
      * @since 3.0.0
      */
     private static final class InMemoryGroupByCache implements GroupByCache {
@@ -238,9 +238,9 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
         private final ServerAggregators aggregators;
         private final RegionCoprocessorEnvironment env;
         private final byte[] customAnnotations;
-        
+
         private int estDistVals;
-        
+
         InMemoryGroupByCache(RegionCoprocessorEnvironment env, 
ImmutableBytesWritable tenantId, byte[] customAnnotations, ServerAggregators 
aggregators, int estDistVals) {
             int estValueSize = aggregators.getEstimatedByteSize();
             long estSize = sizeOfUnorderedGroupByMap(estDistVals, 
estValueSize);
@@ -252,7 +252,7 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
             this.chunk = tenantCache.getMemoryManager().allocate(estSize);
             this.customAnnotations = customAnnotations;
         }
-        
+
         @Override
         public void close() throws IOException {
             this.chunk.close();
@@ -291,7 +291,7 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
             chunk.resize(estSize);
 
             final List<KeyValue> aggResults = new 
ArrayList<KeyValue>(aggregateMap.size());
-            
+
             final Iterator<Map.Entry<ImmutableBytesPtr, Aggregator[]>> 
cacheIter =
                     aggregateMap.entrySet().iterator();
             while (cacheIter.hasNext()) {
@@ -333,7 +333,9 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
 
                 @Override
                 public boolean next(List<Cell> results) throws IOException {
-                    if (index >= aggResults.size()) return false;
+                    if (index >= aggResults.size()) {
+                        return false;
+                    }
                     results.add(aggResults.get(index));
                     index++;
                     return index < aggResults.size();
@@ -343,6 +345,11 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
                 public long getMaxResultSize() {
                        return s.getMaxResultSize();
                 }
+
+                @Override
+                public int getBatch() {
+                    return s.getBatch();
+                }
             };
         }
 
@@ -350,22 +357,22 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
         public long size() {
             return aggregateMap.size();
         }
-        
+
     }
     private static final class GroupByCacheFactory {
         public static final GroupByCacheFactory INSTANCE = new 
GroupByCacheFactory();
-        
+
         private GroupByCacheFactory() {
         }
-        
+
         GroupByCache newCache(RegionCoprocessorEnvironment env, 
ImmutableBytesWritable tenantId, byte[] customAnnotations, ServerAggregators 
aggregators, int estDistVals) {
             Configuration conf = env.getConfiguration();
             boolean spillableEnabled =
                     conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, 
DEFAULT_GROUPBY_SPILLABLE);
             if (spillableEnabled) {
                 return new SpillableGroupByCache(env, tenantId, aggregators, 
estDistVals);
-            } 
-            
+            }
+
             return new InMemoryGroupByCache(env, tenantId, customAnnotations, 
aggregators, estDistVals);
         }
     }
@@ -388,14 +395,14 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
         byte[] estDistValsBytes = 
scan.getAttribute(BaseScannerRegionObserver.ESTIMATED_DISTINCT_VALUES);
         if (estDistValsBytes != null) {
             // Allocate 1.5x estimation
-            estDistVals = Math.max(MIN_DISTINCT_VALUES, 
+            estDistVals = Math.max(MIN_DISTINCT_VALUES,
                             (int) (Bytes.toInt(estDistValsBytes) * 1.5f));
         }
 
         final boolean spillableEnabled =
                 conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, 
DEFAULT_GROUPBY_SPILLABLE);
 
-        GroupByCache groupByCache = 
+        GroupByCache groupByCache =
                 GroupByCacheFactory.INSTANCE.newCache(
                         env, ScanUtil.getTenantId(scan), 
ScanUtil.getCustomAnnotations(scan),
                         aggregators, estDistVals);
@@ -453,7 +460,7 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
      * Used for an aggregate query in which the key order match the group by 
key order. In this
      * case, we can do the aggregation as we scan, by detecting when the group 
by key changes.
      * @param limit TODO
-     * @throws IOException 
+     * @throws IOException
      */
     private RegionScanner scanOrdered(final 
ObserverContext<RegionCoprocessorEnvironment> c,
             final Scan scan, final RegionScanner scanner, final 
List<Expression> expressions,
@@ -559,11 +566,15 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
                 currentKey = null;
                 return false;
             }
-            
+
             @Override
             public long getMaxResultSize() {
                 return scanner.getMaxResultSize();
             }
+            @Override
+            public int getBatch() {
+                return scanner.getBatch();
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
index cdfc771..1e34d96 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.cache.GlobalCache;
 import org.apache.phoenix.cache.HashCache;
@@ -48,7 +49,7 @@ import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.TupleUtil;
 
 public class HashJoinRegionScanner implements RegionScanner {
-    
+
     private final RegionScanner scanner;
     private final TupleProjector projector;
     private final HashJoinInfo joinInfo;
@@ -60,7 +61,7 @@ public class HashJoinRegionScanner implements RegionScanner {
     private List<Tuple>[] tempTuples;
     private ValueBitSet tempDestBitSet;
     private ValueBitSet[] tempSrcBitSet;
-    
+
     @SuppressWarnings("unchecked")
     public HashJoinRegionScanner(RegionScanner scanner, TupleProjector 
projector, HashJoinInfo joinInfo, ImmutableBytesWritable tenantId, 
RegionCoprocessorEnvironment env) throws IOException {
         this.scanner = scanner;
@@ -92,8 +93,8 @@ public class HashJoinRegionScanner implements RegionScanner {
             }
             HashCache hashCache = (HashCache)cache.getServerCache(joinId);
             if (hashCache == null)
-                throw new DoNotRetryIOException("Could not find hash cache for 
joinId: " 
-                        + Bytes.toString(joinId.get(), joinId.getOffset(), 
joinId.getLength()) 
+                throw new DoNotRetryIOException("Could not find hash cache for 
joinId: "
+                        + Bytes.toString(joinId.get(), joinId.getOffset(), 
joinId.getLength())
                         + ". The cache might have expired and have been 
removed.");
             hashCaches[i] = hashCache;
             tempSrcBitSet[i] = 
ValueBitSet.newInstance(joinInfo.getSchemas()[i]);
@@ -103,18 +104,19 @@ public class HashJoinRegionScanner implements 
RegionScanner {
             this.projector.setValueBitSet(tempDestBitSet);
         }
     }
-    
+
     private void processResults(List<Cell> result, boolean hasBatchLimit) 
throws IOException {
         if (result.isEmpty())
             return;
-        
+
         Tuple tuple = new ResultTuple(Result.create(result));
         // For backward compatibility. In new versions, 
HashJoinInfo.forceProjection()
         // always returns true.
         if (joinInfo.forceProjection()) {
             tuple = projector.projectResults(tuple);
         }
-        
+
+        // TODO: fix below Scanner.next() and Scanner.nextRaw() methods as 
well.
         if (hasBatchLimit)
             throw new UnsupportedOperationException("Cannot support join 
operations in scans with limit");
 
@@ -157,7 +159,7 @@ public class HashJoinRegionScanner implements RegionScanner 
{
                         Tuple lhs = resultQueue.poll();
                         if (!earlyEvaluation) {
                             ImmutableBytesPtr key = 
TupleUtil.getConcatenatedValue(lhs, joinInfo.getJoinExpressions()[i]);
-                            tempTuples[i] = hashCaches[i].get(key);            
                
+                            tempTuples[i] = hashCaches[i].get(key);
                             if (tempTuples[i] == null) {
                                 if (type == JoinType.Inner || type == 
JoinType.Semi) {
                                     continue;
@@ -171,7 +173,7 @@ public class HashJoinRegionScanner implements RegionScanner 
{
                             Tuple joined = tempSrcBitSet[i] == 
ValueBitSet.EMPTY_VALUE_BITSET ?
                                     lhs : TupleProjector.mergeProjectedValue(
                                             (ProjectedValueTuple) lhs, schema, 
tempDestBitSet,
-                                            null, joinInfo.getSchemas()[i], 
tempSrcBitSet[i], 
+                                            null, joinInfo.getSchemas()[i], 
tempSrcBitSet[i],
                                             joinInfo.getFieldPositions()[i]);
                             resultQueue.offer(joined);
                             continue;
@@ -180,7 +182,7 @@ public class HashJoinRegionScanner implements RegionScanner 
{
                             Tuple joined = tempSrcBitSet[i] == 
ValueBitSet.EMPTY_VALUE_BITSET ?
                                     lhs : TupleProjector.mergeProjectedValue(
                                             (ProjectedValueTuple) lhs, schema, 
tempDestBitSet,
-                                            t, joinInfo.getSchemas()[i], 
tempSrcBitSet[i], 
+                                            t, joinInfo.getSchemas()[i], 
tempSrcBitSet[i],
                                             joinInfo.getFieldPositions()[i]);
                             resultQueue.offer(joined);
                         }
@@ -211,18 +213,19 @@ public class HashJoinRegionScanner implements 
RegionScanner {
             }
         }
     }
-    
+
     private boolean shouldAdvance() {
         if (!resultQueue.isEmpty())
             return false;
-        
+
         return hasMore;
     }
-    
+
     private boolean nextInQueue(List<Cell> results) {
-        if (resultQueue.isEmpty())
+        if (resultQueue.isEmpty()) {
             return false;
-        
+        }
+
         Tuple tuple = resultQueue.poll();
         for (int i = 0; i < tuple.size(); i++) {
             results.add(tuple.getValue(i));
@@ -252,19 +255,19 @@ public class HashJoinRegionScanner implements 
RegionScanner {
             processResults(result, false);
             result.clear();
         }
-        
+
         return nextInQueue(result);
     }
 
     @Override
-    public boolean nextRaw(List<Cell> result, int limit)
+    public boolean nextRaw(List<Cell> result, ScannerContext scannerContext)
             throws IOException {
         while (shouldAdvance()) {
-            hasMore = scanner.nextRaw(result, limit);
-            processResults(result, true);
+            hasMore = scanner.nextRaw(result, scannerContext);
+            processResults(result, false); // TODO fix honoring the limit
             result.clear();
         }
-        
+
         return nextInQueue(result);
     }
 
@@ -285,19 +288,19 @@ public class HashJoinRegionScanner implements 
RegionScanner {
             processResults(result, false);
             result.clear();
         }
-        
+
         return nextInQueue(result);
     }
 
     @Override
-    public boolean next(List<Cell> result, int limit) throws IOException {
+    public boolean next(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
         while (shouldAdvance()) {
-            hasMore = scanner.next(result, limit);
-            processResults(result, true);
+            hasMore = scanner.next(result, scannerContext);
+            processResults(result, false); // TODO honoring the limit
             result.clear();
         }
-        
-        return nextInQueue(result);
+
+      return nextInQueue(result);
     }
 
     @Override
@@ -305,5 +308,10 @@ public class HashJoinRegionScanner implements 
RegionScanner {
         return this.scanner.getMaxResultSize();
     }
 
+    @Override
+    public int getBatch() {
+        return this.scanner.getBatch();
+    }
+
 }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index 6f1d5ac..c40e3cd 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -69,20 +69,20 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
     protected ScheduledThreadPoolExecutor executor = new 
ScheduledThreadPoolExecutor(1);
     private boolean enableRebuildIndex = 
QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD;
     private long rebuildIndexTimeInterval = 
QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL;
-  
+
     @Override
     public void preClose(final ObserverContext<RegionCoprocessorEnvironment> c,
             boolean abortRequested) {
         executor.shutdownNow();
         
GlobalCache.getInstance(c.getEnvironment()).getMetaDataCache().invalidateAll();
     }
-    
+
     @Override
     public void start(CoprocessorEnvironment env) throws IOException {
-        // sleep a little bit to compensate time clock skew when 
SYSTEM.CATALOG moves 
+        // sleep a little bit to compensate time clock skew when 
SYSTEM.CATALOG moves
         // among region servers because we relies on server time of RS which 
is hosting
         // SYSTEM.CATALOG
-        long sleepTime = 
env.getConfiguration().getLong(QueryServices.CLOCK_SKEW_INTERVAL_ATTRIB, 
+        long sleepTime = 
env.getConfiguration().getLong(QueryServices.CLOCK_SKEW_INTERVAL_ATTRIB,
             QueryServicesOptions.DEFAULT_CLOCK_SKEW_INTERVAL);
         try {
             if(sleepTime > 0) {
@@ -91,12 +91,12 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
         }
-        enableRebuildIndex = 
env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB,
 
+        enableRebuildIndex = 
env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB,
             QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD);
-        rebuildIndexTimeInterval = 
env.getConfiguration().getLong(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB,
 
+        rebuildIndexTimeInterval = 
env.getConfiguration().getLong(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB,
             
QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL);
     }
-    
+
 
     @Override
     public void postOpen(ObserverContext<RegionCoprocessorEnvironment> e) {
@@ -119,7 +119,7 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
             LOG.error("BuildIndexScheduleTask cannot start!", ex);
         }
     }
-    
+
     /**
      * Task runs periodically to build indexes whose 
INDEX_NEED_PARTIALLY_REBUILD is set true
      *
@@ -133,7 +133,7 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
         public BuildIndexScheduleTask(RegionCoprocessorEnvironment env) {
             this.env = env;
         }
-      
+
         private String getJdbcUrl() {
             String zkQuorum = 
this.env.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM);
             String zkClientPort = 
this.env.getConfiguration().get(HConstants.ZOOKEEPER_CLIENT_PORT,
@@ -144,7 +144,8 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
                 + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkClientPort
                 + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkParentNode;
         }
-      
+
+        @Override
         public void run() {
             RegionScanner scanner = null;
             PhoenixConnection conn = null;
@@ -199,7 +200,7 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
                         PhoenixDatabaseMetaData.INDEX_STATE_BYTES);
                     if ((dataTable == null || dataTable.length == 0)
                             || (indexStat == null || indexStat.length == 0)
-                            || 
((Bytes.compareTo(PIndexState.DISABLE.getSerializedBytes(), indexStat) != 0) 
+                            || 
((Bytes.compareTo(PIndexState.DISABLE.getSerializedBytes(), indexStat) != 0)
                                     && 
(Bytes.compareTo(PIndexState.INACTIVE.getSerializedBytes(), indexStat) != 0))) {
                         // index has to be either in disable or inactive state
                         // data table name can't be empty

http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
index ddde407..77e124d 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -199,7 +199,7 @@ public class ScanRegionObserver extends 
BaseScannerRegionObserver {
             indexMaintainer = indexMaintainers.get(0);
             viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
         }
-        
+
         final TupleProjector p = 
TupleProjector.deserializeProjectorFromScan(scan);
         final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
         innerScanner =
@@ -285,12 +285,12 @@ public class ScanRegionObserver extends 
BaseScannerRegionObserver {
                 } finally {
                     try {
                         if(iterator != null) {
-                            iterator.close();    
+                            iterator.close();
                         }
                     } catch (SQLException e) {
                         
ServerUtil.throwIOException(region.getRegionNameAsString(), e);
                     } finally {
-                        chunk.close();                
+                        chunk.close();
                     }
                 }
             }
@@ -299,6 +299,11 @@ public class ScanRegionObserver extends 
BaseScannerRegionObserver {
             public long getMaxResultSize() {
                 return s.getMaxResultSize();
             }
+
+            @Override
+            public int getBatch() {
+              return s.getBatch();
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index e43e5e5..2d6d98a 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -101,8 +101,8 @@ import com.google.common.collect.Sets;
 
 /**
  * Region observer that aggregates ungrouped rows(i.e. SQL query with 
aggregation function and no GROUP BY).
- * 
- * 
+ *
+ *
  * @since 0.1
  */
 public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
@@ -116,7 +116,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
     public static final String EMPTY_CF = "EmptyCF";
     private static final Logger logger = 
LoggerFactory.getLogger(UngroupedAggregateRegionObserver.class);
     private KeyValueBuilder kvBuilder;
-    
+
     @Override
     public void start(CoprocessorEnvironment e) throws IOException {
         super.start(e);
@@ -139,14 +139,14 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
     public static void serializeIntoScan(Scan scan) {
         scan.setAttribute(BaseScannerRegionObserver.UNGROUPED_AGG, 
QueryConstants.TRUE);
     }
-    
+
     @Override
     public RegionScanner 
preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, 
RegionScanner s)
             throws IOException {
         s = super.preScannerOpen(e, scan, s);
         if (ScanUtil.isAnalyzeTable(scan)) {
             // We are setting the start row and stop row such that it covers 
the entire region. As part
-            // of Phonenix-1263 we are storing the guideposts against the 
physical table rather than 
+            // of Phonenix-1263 we are storing the guideposts against the 
physical table rather than
             // individual tenant specific tables.
             scan.setStartRow(HConstants.EMPTY_START_ROW);
             scan.setStopRow(HConstants.EMPTY_END_ROW);
@@ -154,7 +154,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
         }
         return s;
     }
-    
+
     @Override
     protected RegionScanner doPostScannerOpen(final 
ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final 
RegionScanner s) throws IOException {
         int offset = 0;
@@ -179,9 +179,9 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
         byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
         List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? 
null : IndexMaintainer.deserialize(localIndexBytes);
         List<Mutation> indexMutations = localIndexBytes == null ? 
Collections.<Mutation>emptyList() : 
Lists.<Mutation>newArrayListWithExpectedSize(1024);
-        
+
         RegionScanner theScanner = s;
-        
+
         byte[] indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID);
         PTable projectedTable = null;
         List<Expression> selectExpressions = null;
@@ -226,14 +226,14 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
             }
             ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
             theScanner =
-                    getWrappedScanner(c, theScanner, offset, scan, 
dataColumns, tupleProjector, 
+                    getWrappedScanner(c, theScanner, offset, scan, 
dataColumns, tupleProjector,
                             dataRegion, indexMaintainers == null ? null : 
indexMaintainers.get(0), viewConstants, p, tempPtr);
-        } 
-        
+        }
+
         if (j != null)  {
             theScanner = new HashJoinRegionScanner(theScanner, p, j, 
ScanUtil.getTenantId(scan), c.getEnvironment());
         }
-        
+
         int batchSize = 0;
         List<Mutation> mutations = Collections.emptyList();
         boolean buildLocalIndex = indexMaintainers != null && 
dataColumns==null && !localIndexScan;
@@ -330,7 +330,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
                                         }
                                         column.getDataType().coerceBytes(ptr, 
value,
                                             expression.getDataType(), 
expression.getMaxLength(),
-                                            expression.getScale(), 
expression.getSortOrder(), 
+                                            expression.getScale(), 
expression.getSortOrder(),
                                             column.getMaxLength(), 
column.getScale(),
                                             column.getSortOrder());
                                         byte[] bytes = 
ByteUtil.copyKeyBytesIfNecessary(ptr);
@@ -418,7 +418,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
                 }
             }
         }
-        
+
         if (logger.isDebugEnabled()) {
                logger.debug(LogUtil.addCustomAnnotations("Finished scanning " 
+ rowCount + " rows for ungrouped coprocessor scan " + scan, 
ScanUtil.getCustomAnnotations(scan)));
         }
@@ -438,7 +438,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
             keyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length);
         }
         final KeyValue aggKeyValue = keyValue;
-        
+
         RegionScanner scanner = new BaseRegionScanner() {
             private boolean done = !hadAny;
 
@@ -464,11 +464,16 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
                 results.add(aggKeyValue);
                 return false;
             }
-            
+
             @Override
             public long getMaxResultSize() {
                return scan.getMaxResultSize();
             }
+
+            @Override
+            public int getBatch() {
+                return innerScanner.getBatch();
+            }
         };
         return scanner;
     }
@@ -496,7 +501,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
         }
         indexMutations.clear();
     }
-    
+
     @Override
     public InternalScanner 
preCompact(ObserverContext<RegionCoprocessorEnvironment> c,
         final Store store, InternalScanner scanner, final ScanType scanType)
@@ -505,8 +510,8 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
         InternalScanner internalScanner = scanner;
         if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
             try {
-                boolean useCurrentTime = 
-                        
c.getEnvironment().getConfiguration().getBoolean(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
 
+                boolean useCurrentTime =
+                        
c.getEnvironment().getConfiguration().getBoolean(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
                                 
QueryServicesOptions.DEFAULT_STATS_USE_CURRENT_TIME);
                 // Provides a means of clients controlling their timestamps to 
not use current time
                 // when background tasks are updating stats. Instead we track 
the max timestamp of
@@ -526,8 +531,8 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
         }
         return internalScanner;
     }
-    
-    
+
+
     @Override
     public void postSplit(ObserverContext<RegionCoprocessorEnvironment> e, 
HRegion l, HRegion r)
             throws IOException {
@@ -535,8 +540,8 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
         TableName table = region.getRegionInfo().getTable();
         StatisticsCollector stats = null;
         try {
-            boolean useCurrentTime = 
-                    
e.getEnvironment().getConfiguration().getBoolean(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
 
+            boolean useCurrentTime =
+                    
e.getEnvironment().getConfiguration().getBoolean(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
                             
QueryServicesOptions.DEFAULT_STATS_USE_CURRENT_TIME);
             // Provides a means of clients controlling their timestamps to not 
use current time
             // when background tasks are updating stats. Instead we track the 
max timestamp of
@@ -544,7 +549,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
             long clientTimeStamp = useCurrentTime ? 
TimeKeeper.SYSTEM.getCurrentTime() : StatisticsCollector.NO_TIMESTAMP;
             stats = new StatisticsCollector(e.getEnvironment(), 
table.getNameAsString(), clientTimeStamp);
             stats.splitStats(region, l, r);
-        } catch (IOException ioe) { 
+        } catch (IOException ioe) {
             if(logger.isWarnEnabled()) {
                 logger.warn("Error while collecting stats during split for " + 
table,ioe);
             }
@@ -559,7 +564,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
             return PTableImpl.createFromProto(ptableProto);
         } catch (IOException e) {
             throw new RuntimeException(e);
-        } 
+        }
     }
 
     private static List<Expression> deserializeExpressions(byte[] b) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
index 3469042..71cc1d6 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
@@ -70,4 +70,4 @@ public class LocalTable implements LocalHBaseState {
     scanner.close();
     return r;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/FamilyOnlyFilter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/FamilyOnlyFilter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/FamilyOnlyFilter.java
index 68555ef..d39b01d 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/FamilyOnlyFilter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/FamilyOnlyFilter.java
@@ -58,14 +58,14 @@ public class FamilyOnlyFilter extends FamilyFilter {
   @Override
   public ReturnCode filterKeyValue(Cell v) {
     if (done) {
-      return ReturnCode.SKIP;
+      return ReturnCode.NEXT_ROW;
     }
     ReturnCode code = super.filterKeyValue(v);
     if (previousMatchFound) {
       // we found a match before, and now we are skipping the key because of 
the family, therefore
       // we are done (no more of the family).
-      if (code.equals(ReturnCode.SKIP)) {
-      done = true;
+      if (code.equals(ReturnCode.SKIP) || code.equals(ReturnCode.NEXT_ROW)) {
+        done = true;
       }
     } else {
       // if we haven't seen a match before, then it doesn't matter what we see 
now, except to mark

http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/FilteredKeyValueScanner.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/FilteredKeyValueScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/FilteredKeyValueScanner.java
index e225696..435a1c0 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/FilteredKeyValueScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/FilteredKeyValueScanner.java
@@ -57,7 +57,7 @@ public class FilteredKeyValueScanner implements 
KeyValueScanner {
     /**
      * Same a {@link KeyValueScanner#next()} except that we filter out the 
next {@link KeyValue} until we find one that
      * passes the filter.
-     * 
+     *
      * @return the next {@link KeyValue} or <tt>null</tt> if no next {@link 
KeyValue} is present and passes all the
      *         filters.
      */

http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
index b89c807..b5e6a63 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
@@ -59,14 +59,14 @@ public class PhoenixIndexBuilder extends 
CoveredColumnsIndexBuilder {
             Mutation m = miniBatchOp.getOperation(i);
             keys.add(PVarbinary.INSTANCE.getKeyRange(m.getRow()));
             List<IndexMaintainer> indexMaintainers = 
getCodec().getIndexMaintainers(m.getAttributesMap());
-            
+
             for(IndexMaintainer indexMaintainer: indexMaintainers) {
                 if (indexMaintainer.isImmutableRows() && 
indexMaintainer.isLocalIndex()) continue;
                 indexTableName.set(indexMaintainer.getIndexTableName());
                 if (maintainers.get(indexTableName) != null) continue;
                 maintainers.put(indexTableName, indexMaintainer);
             }
-            
+
         }
         if (maintainers.isEmpty()) return;
         Scan scan = IndexManagementUtil.newLocalStateScan(new 
ArrayList<IndexMaintainer>(maintainers.values()));
@@ -100,7 +100,7 @@ public class PhoenixIndexBuilder extends 
CoveredColumnsIndexBuilder {
     private PhoenixIndexCodec getCodec() {
         return (PhoenixIndexCodec)this.codec;
     }
-    
+
     @Override
     public byte[] getBatchId(Mutation m){
         return this.codec.getBatchId(m);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
index 88e141a..52fbe9c 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
@@ -31,15 +31,15 @@ import org.apache.phoenix.util.ServerUtil;
 
 public class RegionScannerResultIterator extends BaseResultIterator {
     private final RegionScanner scanner;
-    
+
     public RegionScannerResultIterator(RegionScanner scanner) {
         this.scanner = scanner;
     }
-    
+
     @Override
     public Tuple next() throws SQLException {
-        // XXX: No access here to the region instance to enclose this with 
startRegionOperation / 
-        // stopRegionOperation 
+        // XXX: No access here to the region instance to enclose this with 
startRegionOperation /
+        // stopRegionOperation
         synchronized (scanner) {
             try {
                 // TODO: size
@@ -48,6 +48,7 @@ public class RegionScannerResultIterator extends 
BaseResultIterator {
                 // since this is an indication of whether or not there are 
more values after the
                 // ones returned
                 boolean hasMore = scanner.nextRaw(results);
+
                 if (!hasMore && results.isEmpty()) {
                     return null;
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
index de59304..0e50923 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 
 /**
@@ -58,15 +59,15 @@ public class StatisticsScanner implements InternalScanner {
     }
 
     @Override
-    public boolean next(List<Cell> result, int limit) throws IOException {
-        boolean ret = delegate.next(result, limit);
+    public boolean next(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
+        boolean ret = delegate.next(result, scannerContext);
         updateStat(result);
         return ret;
     }
 
     /**
      * Update the current statistics based on the lastest batch of key-values 
from the underlying scanner
-     * 
+     *
      * @param results
      *            next batch of {@link KeyValue}s
      */
@@ -122,4 +123,5 @@ public class StatisticsScanner implements InternalScanner {
             }
         }
     }
-}
\ No newline at end of file
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
 
b/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
index 12f1863..030b114 100644
--- 
a/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.ipc.RpcScheduler.Context;
+import org.apache.hadoop.hbase.ipc.RpcServer.Connection;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -86,11 +87,12 @@ public class PhoenixIndexRpcSchedulerTest {
     }
 
     private void dispatchCallWithPriority(RpcScheduler scheduler, int 
priority) throws Exception {
+        Connection connection = Mockito.mock(Connection.class);
         CallRunner task = Mockito.mock(CallRunner.class);
         RequestHeader header = 
RequestHeader.newBuilder().setPriority(priority).build();
         RpcServer server = new RpcServer(null, "test-rpcserver", null, isa, 
conf, scheduler);
         RpcServer.Call call =
-                server.new Call(0, null, null, header, null, null, null, null, 
10, null);
+                server.new Call(0, null, null, header, null, null, connection, 
null, 10, null, null);
         Mockito.when(task.getCall()).thenReturn(call);
 
         scheduler.dispatch(task);
@@ -98,4 +100,4 @@ public class PhoenixIndexRpcSchedulerTest {
         Mockito.verify(task).getCall();
         Mockito.verifyNoMoreInteractions(task);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
index 54db5d8..e996b23 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
@@ -37,7 +37,6 @@ import org.junit.Test;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
-
 import org.apache.phoenix.hbase.index.covered.IndexUpdate;
 import org.apache.phoenix.hbase.index.covered.LocalTableState;
 import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/filter/TestFamilyOnlyFilter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/filter/TestFamilyOnlyFilter.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/filter/TestFamilyOnlyFilter.java
index 216f548..808e6bc 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/filter/TestFamilyOnlyFilter.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/filter/TestFamilyOnlyFilter.java
@@ -47,7 +47,7 @@ public class TestFamilyOnlyFilter {
 
     kv = new KeyValue(row, fam2, qual, 10, val);
     code = filter.filterKeyValue(kv);
-    assertEquals("Didn't filter out non-matching family!", ReturnCode.SKIP, 
code);
+    assertEquals("Didn't filter out non-matching family!", 
ReturnCode.NEXT_ROW, code);
   }
 
   @Test
@@ -61,7 +61,7 @@ public class TestFamilyOnlyFilter {
     KeyValue kv = new KeyValue(row, fam, qual, 10, val);
 
     ReturnCode code = filter.filterKeyValue(kv);
-    assertEquals("Didn't filter out non-matching family!", ReturnCode.SKIP, 
code);
+    assertEquals("Didn't filter out non-matching family!", 
ReturnCode.NEXT_ROW, code);
 
     kv = new KeyValue(row, fam2, qual, 10, val);
     code = filter.filterKeyValue(kv);
@@ -69,7 +69,7 @@ public class TestFamilyOnlyFilter {
 
     kv = new KeyValue(row, fam3, qual, 10, val);
     code = filter.filterKeyValue(kv);
-    assertEquals("Didn't filter out non-matching family!", ReturnCode.SKIP, 
code);
+    assertEquals("Didn't filter out non-matching family!", 
ReturnCode.NEXT_ROW, code);
   }
 
   @Test
@@ -83,7 +83,7 @@ public class TestFamilyOnlyFilter {
     KeyValue kv = new KeyValue(row, fam, qual, 10, val);
 
     ReturnCode code = filter.filterKeyValue(kv);
-    assertEquals("Didn't filter out non-matching family!", ReturnCode.SKIP, 
code);
+    assertEquals("Didn't filter out non-matching family!", 
ReturnCode.NEXT_ROW, code);
 
     KeyValue accept = new KeyValue(row, fam2, qual, 10, val);
     code = filter.filterKeyValue(accept);
@@ -91,12 +91,12 @@ public class TestFamilyOnlyFilter {
 
     kv = new KeyValue(row, fam3, qual, 10, val);
     code = filter.filterKeyValue(kv);
-    assertEquals("Didn't filter out non-matching family!", ReturnCode.SKIP, 
code);
+    assertEquals("Didn't filter out non-matching family!", 
ReturnCode.NEXT_ROW, code);
 
     // we shouldn't match the family again - everything after a switched 
family should be ignored
     code = filter.filterKeyValue(accept);
     assertEquals("Should have skipped a 'matching' family if it arrives out of 
order",
-      ReturnCode.SKIP, code);
+      ReturnCode.NEXT_ROW, code);
 
     // reset the filter and we should accept it again
     filter.reset();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
index 60c11d7..ae577bd 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
@@ -317,9 +317,9 @@ public class TestWALRecoveryCaching {
         }
 
         LOG.info("Starting region server:" + server.getHostname());
-        cluster.startRegionServer(server.getHostname());
+        cluster.startRegionServer(server.getHostname(), server.getPort());
 
-        cluster.waitForRegionServerToStart(server.getHostname(), TIMEOUT);
+        cluster.waitForRegionServerToStart(server.getHostname(), 
server.getPort(), TIMEOUT);
 
         // start a server to get back to the base number of servers
         LOG.info("STarting server to replace " + server);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-flume/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-flume/pom.xml b/phoenix-flume/pom.xml
index 7ed0801..b2b9a47 100644
--- a/phoenix-flume/pom.xml
+++ b/phoenix-flume/pom.xml
@@ -85,7 +85,6 @@
     <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-testing-util</artifactId>
-      <version>${hbase.version}</version>
       <scope>test</scope>
       <optional>true</optional>
       <exclusions>
@@ -98,7 +97,6 @@
     <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-it</artifactId>
-      <version>${hbase.version}</version>
       <type>test-jar</type>
       <scope>test</scope>
       <exclusions>
@@ -111,41 +109,34 @@
     <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-common</artifactId>
-      <version>${hbase.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-protocol</artifactId>
-      <version>${hbase.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-client</artifactId>
-      <version>${hbase.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-hadoop-compat</artifactId>
-      <version>${hbase.version}</version>
       <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-hadoop-compat</artifactId>
-      <version>${hbase.version}</version>
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-hadoop2-compat</artifactId>
-      <version>${hbase.version}</version>
       <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-hadoop2-compat</artifactId>
-      <version>${hbase.version}</version>
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>

Reply via email to