http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index ff44d2e..3453120 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -69,6 +69,7 @@ import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.trace.TracingIterator;
 import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.SQLCloseable;
@@ -297,11 +298,6 @@ public abstract class BaseQueryPlan implements QueryPlan {
             // TODO: can have an hint to skip joining back to data table, in 
that case if any column to
             // project is not present in the index then we need to skip this 
plan.
             if (!dataColumns.isEmpty()) {
-                // Set data columns to be join back from data table.
-                serializeDataTableColumnsToJoin(scan, dataColumns);
-                KeyValueSchema schema = 
ProjectedColumnExpression.buildSchema(dataColumns);
-                // Set key value schema of the data columns.
-                serializeSchemaIntoScan(scan, schema);
                 PTable parentTable = context.getCurrentTable().getTable();
                 String parentSchemaName = 
parentTable.getParentSchemaName().getString();
                 String parentTableName = 
parentTable.getParentTableName().getString();
@@ -312,6 +308,12 @@ public abstract class BaseQueryPlan implements QueryPlan {
                             FACTORY.namedTable(null, 
TableName.create(parentSchemaName, parentTableName)),
                             
context.getConnection()).resolveTable(parentSchemaName, parentTableName);
                 PTable dataTable = dataTableRef.getTable();
+                // Set data columns to be join back from data table.
+                serializeDataTableColumnsToJoin(scan, dataColumns, dataTable);
+                KeyValueSchema schema = 
ProjectedColumnExpression.buildSchema(dataColumns);
+                // Set key value schema of the data columns.
+                serializeSchemaIntoScan(scan, schema);
+                
                 // Set index maintainer of the local index.
                 serializeIndexMaintainerIntoScan(scan, dataTable);
                 // Set view constants if exists.
@@ -414,14 +416,14 @@ public abstract class BaseQueryPlan implements QueryPlan {
         }
     }
 
-    private void serializeDataTableColumnsToJoin(Scan scan, Set<PColumn> 
dataColumns) {
+    private void serializeDataTableColumnsToJoin(Scan scan, Set<PColumn> 
dataColumns, PTable dataTable) {
         ByteArrayOutputStream stream = new ByteArrayOutputStream();
         try {
             DataOutputStream output = new DataOutputStream(stream);
             WritableUtils.writeVInt(output, dataColumns.size());
             for (PColumn column : dataColumns) {
                 Bytes.writeByteArray(output, 
column.getFamilyName().getBytes());
-                Bytes.writeByteArray(output, column.getName().getBytes());
+                Bytes.writeByteArray(output, 
EncodedColumnsUtil.getColumnQualifier(column, dataTable));
             }
             
scan.setAttribute(BaseScannerRegionObserver.DATA_TABLE_COLUMNS_TO_JOIN, 
stream.toByteArray());
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index ae78e97..51ca59c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -770,7 +770,7 @@ public class MutationState implements SQLCloseable {
                 }
                 for (PColumn column : columns) {
                     if (column != null) {
-                        
resolvedTable.getColumnFamily(column.getFamilyName().getString()).getColumn(column.getName().getString());
+                        
resolvedTable.getColumnFamily(column.getFamilyName().getString()).getPColumnForColumnName(column.getName().getString());
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index f4ff289..087257f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -656,6 +656,7 @@ public class SortMergeJoinPlan implements QueryPlan {
                 byte[] b = new byte[length];
                 buffer.get(b);
                 Result result = ResultUtil.toResult(new 
ImmutableBytesWritable(b));
+                //TODO: samarth make joins work with position based look up.
                 return new ResultTuple(result);
             }
             

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
index a884949..b6e1de2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
@@ -17,6 +17,9 @@
  */
 package org.apache.phoenix.execute;
 
+import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_FAMILY;
+import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_QUALIFIER;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -50,9 +53,6 @@ import org.apache.phoenix.util.SchemaUtil;
 import com.google.common.base.Preconditions;
 
 public class TupleProjector {    
-    public static final byte[] VALUE_COLUMN_FAMILY = Bytes.toBytes("_v");
-    public static final byte[] VALUE_COLUMN_QUALIFIER = new byte[0];
-    
     private static final String SCAN_PROJECTOR = "scanProjector";
     
     private final KeyValueSchema schema;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/expression/KeyValueColumnExpression.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/KeyValueColumnExpression.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/KeyValueColumnExpression.java
index 4b5fdbb..35862c4 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/KeyValueColumnExpression.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/KeyValueColumnExpression.java
@@ -28,6 +28,7 @@ import 
org.apache.phoenix.expression.visitor.ExpressionVisitor;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PDatum;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.SchemaUtil;
 
 
@@ -41,13 +42,13 @@ import org.apache.phoenix.util.SchemaUtil;
 public class KeyValueColumnExpression extends ColumnExpression {
     private byte[] cf;
     private byte[] cq;
-    private String displayName; // client-side only
+    private String displayName; // client-side only. TODO: samarth see what 
can you do for encoded column names.
 
     public KeyValueColumnExpression() {
     }
 
-    public KeyValueColumnExpression(PColumn column) {
-        this(column, null);
+    public KeyValueColumnExpression(PColumn column, boolean encodedColumnName) 
{
+        this(column, null, encodedColumnName);
     }
 
     public KeyValueColumnExpression(PDatum column, byte[] cf, byte[] cq) {
@@ -56,18 +57,19 @@ public class KeyValueColumnExpression extends 
ColumnExpression {
         this.cq = cq;
     }
 
-    public KeyValueColumnExpression(PColumn column, String displayName) {
+    public KeyValueColumnExpression(PColumn column, String displayName, 
boolean encodedColumnName) {
         super(column);
         this.cf = column.getFamilyName().getBytes();
-        this.cq = column.getName().getBytes();
+        this.cq = EncodedColumnsUtil.getColumnQualifier(column, 
encodedColumnName);
         this.displayName = displayName;
     }
 
     public byte[] getColumnFamily() {
         return cf;
     }
-
-    public byte[] getColumnName() {
+    
+    //TODO: samarth look for the callers of this.
+    public byte[] getColumnQualifier() {
         return cq;
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
index 3a38dee..2744f35 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
@@ -154,6 +154,7 @@ public class ProjectedColumnExpression extends 
ColumnExpression {
         return Determinism.PER_INVOCATION;
     }
 
+    @Override
     public ProjectedColumnExpression clone() {
         return new ProjectedColumnExpression(this.column, this.columns, 
this.position, this.displayName);
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
index b8b0350..d1f6211 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 
 /**
  * When selecting specific columns in a SELECT query, this filter passes only 
selected columns
@@ -53,6 +53,8 @@ public class ColumnProjectionFilter extends FilterBase 
implements Writable {
     private byte[] emptyCFName;
     private Map<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>> 
columnsTracker;
     private Set<byte[]> conditionOnlyCfs;
+    private boolean usesEncodedColumnNames;
+    private byte[] emptyKVQualifier;
 
     public ColumnProjectionFilter() {
 
@@ -60,10 +62,12 @@ public class ColumnProjectionFilter extends FilterBase 
implements Writable {
 
     public ColumnProjectionFilter(byte[] emptyCFName,
             Map<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>> 
columnsTracker,
-            Set<byte[]> conditionOnlyCfs) {
+            Set<byte[]> conditionOnlyCfs, boolean usesEncodedColumnNames) {
         this.emptyCFName = emptyCFName;
         this.columnsTracker = columnsTracker;
         this.conditionOnlyCfs = conditionOnlyCfs;
+        this.usesEncodedColumnNames = usesEncodedColumnNames;
+        this.emptyKVQualifier = 
EncodedColumnsUtil.getEmptyKeyValueInfo(usesEncodedColumnNames).getFirst();
     }
 
     @Override
@@ -87,6 +91,9 @@ public class ColumnProjectionFilter extends FilterBase 
implements Writable {
             familyMapSize--;
         }
         int conditionOnlyCfsSize = WritableUtils.readVInt(input);
+        usesEncodedColumnNames = conditionOnlyCfsSize > 0;
+        emptyKVQualifier = 
EncodedColumnsUtil.getEmptyKeyValueInfo(usesEncodedColumnNames).getFirst();
+        conditionOnlyCfsSize = Math.abs(conditionOnlyCfsSize) - 1; // restore 
to the actual value.
         this.conditionOnlyCfs = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
         while (conditionOnlyCfsSize > 0) {
             
this.conditionOnlyCfs.add(WritableUtils.readCompressedByteArray(input));
@@ -110,12 +117,13 @@ public class ColumnProjectionFilter extends FilterBase 
implements Writable {
                 }
             }
         }
-        // Write conditionOnlyCfs
-        WritableUtils.writeVInt(output, this.conditionOnlyCfs.size());
+        // Encode usesEncodedColumnNames in conditionOnlyCfs size.
+        WritableUtils.writeVInt(output, (this.conditionOnlyCfs.size() + 1) * 
(usesEncodedColumnNames ? 1 : -1));
         for (byte[] f : this.conditionOnlyCfs) {
             WritableUtils.writeCompressedByteArray(output, f);
         }
-    }
+    
+}
 
     @Override
     public byte[] toByteArray() throws IOException {
@@ -153,9 +161,9 @@ public class ColumnProjectionFilter extends FilterBase 
implements Writable {
         // make sure we're not holding to any of the byte[]'s
         ptr.set(HConstants.EMPTY_BYTE_ARRAY);
         if (kvs.isEmpty()) {
-            kvs.add(new KeyValue(firstKV.getRowArray(), 
firstKV.getRowOffset(),firstKV.getRowLength(), this.emptyCFName,
-                    0, this.emptyCFName.length, 
QueryConstants.EMPTY_COLUMN_BYTES, 0,
-                    QueryConstants.EMPTY_COLUMN_BYTES.length, 
HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0));
+            kvs.add(new KeyValue(firstKV.getRowArray(), 
firstKV.getRowOffset(), firstKV.getRowLength(),
+                    this.emptyCFName, 0, this.emptyCFName.length, 
emptyKVQualifier, 0,
+                    emptyKVQualifier.length, HConstants.LATEST_TIMESTAMP, 
Type.Maximum, null, 0, 0));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
index dba700b..c3d52a4 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
@@ -94,7 +94,7 @@ public abstract class MultiKeyValueComparisonFilter extends 
BooleanExpressionFil
             refCount = foundColumns.size();
         }
         
-        public ReturnCode resolveColumn(Cell value) {
+        private ReturnCode resolveColumn(Cell value) {
             // Always set key, in case we never find a key value column of 
interest,
             // and our expression uses row key columns.
             setKey(value);
@@ -184,7 +184,7 @@ public abstract class MultiKeyValueComparisonFilter extends 
BooleanExpressionFil
         ExpressionVisitor<Void> visitor = new 
StatelessTraverseAllExpressionVisitor<Void>() {
             @Override
             public Void visit(KeyValueColumnExpression expression) {
-                inputTuple.addColumn(expression.getColumnFamily(), 
expression.getColumnName());
+                inputTuple.addColumn(expression.getColumnFamily(), 
expression.getColumnQualifier());
                 return null;
             }
         };

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java
index eaf8d35..07f7072 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java
@@ -58,7 +58,7 @@ public abstract class SingleKeyValueComparisonFilter extends 
BooleanExpressionFi
             @Override
             public Void visit(KeyValueColumnExpression expression) {
                 cf = expression.getColumnFamily();
-                cq = expression.getColumnName();
+                cq = expression.getColumnQualifier();
                 return null;
             }
         };

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
index 6f9caa6..0f960e4 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
@@ -32,7 +32,6 @@ import org.apache.phoenix.hbase.index.covered.TableState;
 import org.apache.phoenix.hbase.index.scanner.Scanner;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Lists;
 
 /**
  *

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index ee67ca2..2ad0c8d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -32,8 +32,10 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
+
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
@@ -85,6 +87,7 @@ import org.apache.phoenix.schema.tuple.ValueGetterTuple;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.util.BitSet;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.SchemaUtil;
@@ -275,8 +278,10 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
     // columns required to evaluate all expressions in indexedExpressions 
(this does not include columns in the data row key)
     private Set<ColumnReference> indexedColumns;
     private Set<ColumnReference> coveredColumns;
-    // Map used to cache column family of data table and the corresponding 
column family for the local index
-    private Map<ImmutableBytesPtr, ImmutableBytesWritable> 
dataTableLocalIndexFamilyMap;
+    // Map of covered columns where a key is column reference for a column in 
the data table
+    // and value is column reference for corresponding column in the index 
table.
+    // TODO: samarth confirm that we don't need a separate map for tracking 
column families of local indexes.
+    private Map<ColumnReference, ColumnReference> coveredColumnsMap;
     // columns required to create index row i.e. indexedColumns + 
coveredColumns  (this does not include columns in the data row key)
     private Set<ColumnReference> allColumns;
     // TODO remove this in the next major release
@@ -295,13 +300,14 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
     private final boolean isDataTableSalted;
     private final RowKeySchema dataRowKeySchema;
     
-    private List<ImmutableBytesPtr> indexQualifiers;
     private int estimatedIndexRowKeyBytes;
     private int estimatedExpressionSize;
     private int[] dataPkPosition;
     private int maxTrailingNulls;
     private ColumnReference dataEmptyKeyValueRef;
     private boolean rowKeyOrderOptimizable;
+    private boolean usesEncodedColumnNames;
+    private ImmutableBytesPtr emptyKeyValueQualifierPtr;
     
     private IndexMaintainer(RowKeySchema dataRowKeySchema, boolean 
isDataTableSalted) {
         this.dataRowKeySchema = dataRowKeySchema;
@@ -315,14 +321,19 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         this.isMultiTenant = dataTable.isMultiTenant();
         this.viewIndexId = index.getViewIndexId() == null ? null : 
MetaDataUtil.getViewIndexIdDataType().toBytes(index.getViewIndexId());
         this.isLocalIndex = index.getIndexType() == IndexType.LOCAL;
-
+        /* 
+         * There is nothing to prevent new indexes on existing tables to have 
encoded column names.
+         * Except, due to backward compatibility reasons, we aren't able to 
change IndexMaintainer and the state
+         * that is serialized in it. Because of this we are forced to have the 
indexes inherit the
+         * storage scheme of the parent data tables. 
+         */
+        this.usesEncodedColumnNames = 
EncodedColumnsUtil.usesEncodedColumnNames(dataTable);
         byte[] indexTableName = index.getPhysicalName().getBytes();
         // Use this for the nDataSaltBuckets as we need this for local indexes
         // TODO: persist nDataSaltBuckets separately, but maintain b/w compat.
         Integer nIndexSaltBuckets = isLocalIndex ? dataTable.getBucketNum() : 
index.getBucketNum();
         boolean indexWALDisabled = index.isWALDisabled();
         int indexPosOffset = (index.getBucketNum() == null ? 0 : 1) + 
(this.isMultiTenant ? 1 : 0) + (this.viewIndexId == null ? 0 : 1);
-//        int indexPosOffset = !isLocalIndex && nIndexSaltBuckets > 0 ? 1 : 0;
         int nIndexColumns = index.getColumns().size() - indexPosOffset;
         int nIndexPKColumns = index.getPKColumns().size() - indexPosOffset;
         // number of expressions that are indexed that are not present in the 
row key of the data table
@@ -333,7 +344,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             String dataFamilyName = 
IndexUtil.getDataColumnFamilyName(indexColumnName);
             String dataColumnName = 
IndexUtil.getDataColumnName(indexColumnName);
             try {
-                PColumn dataColumn = dataFamilyName.equals("") ? 
dataTable.getColumn(dataColumnName) : 
dataTable.getColumnFamily(dataFamilyName).getColumn(dataColumnName);
+                PColumn dataColumn = dataFamilyName.equals("") ? 
dataTable.getPColumnForColumnName(dataColumnName) : 
dataTable.getColumnFamily(dataFamilyName).getPColumnForColumnName(dataColumnName);
                 if (SchemaUtil.isPKColumn(dataColumn)) 
                     continue;
             } catch (ColumnNotFoundException e) {
@@ -366,7 +377,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         this.indexedColumnTypes = 
Lists.<PDataType>newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns);
         this.indexedExpressions = 
Lists.newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns);
         this.coveredColumns = 
Sets.newLinkedHashSetWithExpectedSize(nIndexColumns-nIndexPKColumns);
-        this.dataTableLocalIndexFamilyMap = 
Maps.newHashMapWithExpectedSize(nIndexColumns-nIndexPKColumns);
+        this.coveredColumnsMap = Maps.newHashMapWithExpectedSize(nIndexColumns 
- nIndexPKColumns);
         this.nIndexSaltBuckets  = nIndexSaltBuckets == null ? 0 : 
nIndexSaltBuckets;
         this.dataEmptyKeyValueCF = SchemaUtil.getEmptyColumnFamily(dataTable);
         this.emptyKeyValueCFPtr = SchemaUtil.getEmptyColumnFamilyPtr(index);
@@ -408,7 +419,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                 throw new RuntimeException(e); // Impossible
             }
             if ( expressionIndexCompiler.getColumnRef()!=null ) {
-               // get the column of the data table that corresponds to this 
index column
+               // get the column of the data column that corresponds to this 
index column
                    PColumn column = IndexUtil.getDataColumn(dataTable, 
indexColumn.getName().getString());
                    boolean isPKColumn = SchemaUtil.isPKColumn(column);
                    if (isPKColumn) {
@@ -432,12 +443,12 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         for (int i = 0; i < index.getColumnFamilies().size(); i++) {
             PColumnFamily family = index.getColumnFamilies().get(i);
             for (PColumn indexColumn : family.getColumns()) {
-                PColumn column = IndexUtil.getDataColumn(dataTable, 
indexColumn.getName().getString());
-                PName dataTableFamily = column.getFamilyName();
-                this.coveredColumns.add(new 
ColumnReference(dataTableFamily.getBytes(), column.getName().getBytes()));
-                if(isLocalIndex) {
-                    this.dataTableLocalIndexFamilyMap.put(new 
ImmutableBytesPtr(dataTableFamily.getBytes()), new 
ImmutableBytesWritable(Bytes.toBytes(IndexUtil.getLocalIndexColumnFamily(dataTableFamily.getString()))));
-                }
+                PColumn dataColumn = IndexUtil.getDataColumn(dataTable, 
indexColumn.getName().getString());
+                byte[] dataColumnCq = 
EncodedColumnsUtil.getColumnQualifier(dataColumn, dataTable);
+                byte[] indexColumnCq = 
EncodedColumnsUtil.getColumnQualifier(indexColumn, index);
+                this.coveredColumns.add(new 
ColumnReference(dataColumn.getFamilyName().getBytes(), dataColumnCq));
+                this.coveredColumnsMap.put(new 
ColumnReference(dataColumn.getFamilyName().getBytes(), dataColumnCq), 
+                        new 
ColumnReference(indexColumn.getFamilyName().getBytes(), indexColumnCq));
             }
         }
         this.estimatedIndexRowKeyBytes = 
estimateIndexRowKeyByteSize(indexColByteSize);
@@ -853,14 +864,16 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             put = new Put(indexRowKey);
             // add the keyvalue for the empty row
             put.add(kvBuilder.buildPut(new ImmutableBytesPtr(indexRowKey),
-                this.getEmptyKeyValueFamily(), 
QueryConstants.EMPTY_COLUMN_BYTES_PTR, ts,
+                this.getEmptyKeyValueFamily(), emptyKeyValueQualifierPtr, ts,
                 // set the value to the empty column name
-                QueryConstants.EMPTY_COLUMN_BYTES_PTR));
+                emptyKeyValueQualifierPtr));
             put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : 
Durability.SKIP_WAL);
         }
-        int i = 0;
         for (ColumnReference ref : this.getCoveredColumns()) {
-            ImmutableBytesPtr cq = this.indexQualifiers.get(i++);
+            //FIXME: samarth figure out a backward compatible way to handle 
this as coveredcolumnsmap won't be availble for older phoenix clients.
+            ColumnReference indexColRef = this.coveredColumnsMap.get(ref);
+            ImmutableBytesPtr cq = indexColRef.getQualifierWritable();
+            ImmutableBytesPtr cf = indexColRef.getFamilyWritable();
             ImmutableBytesWritable value = valueGetter.getLatestValue(ref);
             byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, 
regionStartKey, regionEndKey);
             ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexRowKey);
@@ -870,12 +883,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                     put.setDurability(!indexWALDisabled ? 
Durability.USE_DEFAULT : Durability.SKIP_WAL);
                 }
                 //this is a little bit of extra work for installations that 
are running <0.94.14, but that should be rare and is a short-term set of 
wrappers - it shouldn't kill GC
-                if(this.isLocalIndex) {
-                    ImmutableBytesWritable localIndexColFamily = 
this.dataTableLocalIndexFamilyMap.get(ref.getFamilyWritable());
-                    put.add(kvBuilder.buildPut(rowKey, localIndexColFamily, 
cq, ts, value));
-                } else {
-                    put.add(kvBuilder.buildPut(rowKey, 
ref.getFamilyWritable(), cq, ts, value));
-                }
+                put.add(kvBuilder.buildPut(rowKey, cf, cq, ts, value));
             }
         }
         return put;
@@ -963,14 +971,12 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             
             for (ColumnReference ref : getCoveredColumns()) {
                 byte[] family = ref.getFamily();
-                if (this.isLocalIndex) {
-                    family = 
this.dataTableLocalIndexFamilyMap.get(ref.getFamilyWritable()).get();
-                }
+                ColumnReference indexColumn = coveredColumnsMap.get(ref);
                 // If table delete was single version, then index delete 
should be as well
                 if (deleteType == DeleteType.SINGLE_VERSION) {
-                    delete.deleteFamilyVersion(family, ts);
+                    delete.deleteFamilyVersion(indexColumn.getFamily(), ts);
                 } else {
-                    delete.deleteFamily(family, ts);
+                    delete.deleteFamily(indexColumn.getFamily(), ts);
                 }
             }
             if (deleteType == DeleteType.SINGLE_VERSION) {
@@ -991,12 +997,15 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                         delete = new Delete(indexRowKey);                    
                         delete.setDurability(!indexWALDisabled ? 
Durability.USE_DEFAULT : Durability.SKIP_WAL);
                     }
-                    byte[] family = this.isLocalIndex ? 
this.dataTableLocalIndexFamilyMap.get(ref.getFamilyWritable()).get() : 
ref.getFamily();
+                    ColumnReference indexColumn = coveredColumnsMap.get(ref);
                     // If point delete for data table, then use point delete 
for index as well
-                    if (kv.getTypeByte() == KeyValue.Type.Delete.getCode()) {
-                        delete.deleteColumn(family, 
IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts);
+                    if (kv.getTypeByte() == KeyValue.Type.Delete.getCode()) { 
+                        //FIXME: samarth change this. Index column qualifiers 
are not derivable from data table cqs.
+                        // Figure out a backward compatible way of going this 
since coveredColumnsMap won't be available
+                        // for older clients.
+                        delete.deleteColumn(indexColumn.getFamily(), 
indexColumn.getQualifier(), ts);
                     } else {
-                        delete.deleteColumns(family, 
IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts);
+                        delete.deleteColumns(indexColumn.getFamily(), 
indexColumn.getQualifier(), ts);
                     }
                 }
             }
@@ -1051,15 +1060,16 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         isLocalIndex = encodedCoveredolumnsAndLocalIndex < 0;
         int nCoveredColumns = Math.abs(encodedCoveredolumnsAndLocalIndex) - 1;
         coveredColumns = 
Sets.newLinkedHashSetWithExpectedSize(nCoveredColumns);
-        dataTableLocalIndexFamilyMap = 
Maps.newHashMapWithExpectedSize(nCoveredColumns);
+        coveredColumnsMap = Maps.newHashMapWithExpectedSize(nCoveredColumns);
         for (int i = 0; i < nCoveredColumns; i++) {
-            byte[] cf = Bytes.readByteArray(input);
-            byte[] cq = Bytes.readByteArray(input);
-            ColumnReference ref = new ColumnReference(cf,cq);
-            coveredColumns.add(ref);
-            if(isLocalIndex) {
-                dataTableLocalIndexFamilyMap.put(ref.getFamilyWritable(), new 
ImmutableBytesWritable(Bytes.toBytes(IndexUtil.getLocalIndexColumnFamily(Bytes.toString(cf)))));
-            }
+            byte[] dataTableCf = Bytes.readByteArray(input);
+            byte[] dataTableCq = Bytes.readByteArray(input);
+            byte[] indexTableCf = Bytes.readByteArray(input);
+            byte[] indexTableCq = Bytes.readByteArray(input);
+            ColumnReference dataColumn = new ColumnReference(dataTableCf, 
dataTableCq); 
+            coveredColumns.add(dataColumn);
+            ColumnReference indexColumn = new ColumnReference(indexTableCf, 
indexTableCq);
+            coveredColumnsMap.put(dataColumn, indexColumn);
         }
         // Hack to serialize whether the index row key is optimizable
         int len = WritableUtils.readVInt(input);
@@ -1085,6 +1095,8 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         
         if (isNewClient) {
             int numIndexedExpressions = WritableUtils.readVInt(input);
+            usesEncodedColumnNames = numIndexedExpressions > 0;
+            numIndexedExpressions = Math.abs(numIndexedExpressions) - 1;
             indexedExpressions = 
Lists.newArrayListWithExpectedSize(numIndexedExpressions);        
             for (int i = 0; i < numIndexedExpressions; i++) {
                Expression expression = 
ExpressionType.values()[WritableUtils.readVInt(input)].newInstance();
@@ -1161,9 +1173,13 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         }
         // Encode coveredColumns.size() and whether or not this is a local 
index
         WritableUtils.writeVInt(output, (coveredColumns.size() + 1) * 
(isLocalIndex ? -1 : 1));
-        for (ColumnReference ref : coveredColumns) {
-            Bytes.writeByteArray(output, ref.getFamily());
-            Bytes.writeByteArray(output, ref.getQualifier());
+        for (Entry<ColumnReference, ColumnReference> ref : 
coveredColumnsMap.entrySet()) {
+            ColumnReference dataColumn = ref.getKey();
+            ColumnReference indexColumn = ref.getValue();
+            Bytes.writeByteArray(output, dataColumn.getFamily());
+            Bytes.writeByteArray(output, dataColumn.getQualifier());
+            Bytes.writeByteArray(output, indexColumn.getFamily());
+            Bytes.writeByteArray(output, indexColumn.getQualifier());
         }
         // TODO: remove when rowKeyOrderOptimizable hack no longer needed
         WritableUtils.writeVInt(output,indexTableName.length * 
(rowKeyOrderOptimizable ? 1 : -1));
@@ -1174,7 +1190,9 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         WritableUtils.writeVInt(output,-emptyKeyValueCFPtr.getLength());
         output.write(emptyKeyValueCFPtr.get(),emptyKeyValueCFPtr.getOffset(), 
emptyKeyValueCFPtr.getLength());
         
-        WritableUtils.writeVInt(output, indexedExpressions.size());
+        // Hack to encode usesEncodedColumnNames in indexedExpressions size.
+        int indexedExpressionsSize = (indexedExpressions.size() + 1) * 
(usesEncodedColumnNames ? 1 : -1);
+        WritableUtils.writeVInt(output, indexedExpressionsSize);
         for (Expression expression : indexedExpressions) {
                WritableUtils.writeVInt(output, 
ExpressionType.valueOf(expression).ordinal());
                expression.write(output);
@@ -1231,16 +1249,9 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
      * Init calculated state reading/creating
      */
     private void initCachedState() {
-        dataEmptyKeyValueRef =
-                new ColumnReference(emptyKeyValueCFPtr.copyBytesIfNecessary(),
-                        QueryConstants.EMPTY_COLUMN_BYTES);
-
-        indexQualifiers = 
Lists.newArrayListWithExpectedSize(this.coveredColumns.size());
-        for (ColumnReference ref : coveredColumns) {
-            indexQualifiers.add(new 
ImmutableBytesPtr(IndexUtil.getIndexColumnName(
-                ref.getFamily(), ref.getQualifier())));
-        }
-
+        byte[] emptyKvQualifier = 
EncodedColumnsUtil.getEmptyKeyValueInfo(usesEncodedColumnNames).getFirst();
+        dataEmptyKeyValueRef = new 
ColumnReference(emptyKeyValueCFPtr.copyBytesIfNecessary(), emptyKvQualifier);
+        emptyKeyValueQualifierPtr = new ImmutableBytesPtr(emptyKvQualifier);
         this.allColumns = 
Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size() + 
coveredColumns.size());
         // columns that are required to evaluate all expressions in 
indexedExpressions (not including columns in data row key)
         this.indexedColumns = 
Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size());
@@ -1248,7 +1259,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                KeyValueExpressionVisitor visitor = new 
KeyValueExpressionVisitor() {
                 @Override
                 public Void visit(KeyValueColumnExpression expression) {
-                       if (indexedColumns.add(new 
ColumnReference(expression.getColumnFamily(), expression.getColumnName()))) {
+                       if (indexedColumns.add(new 
ColumnReference(expression.getColumnFamily(), 
expression.getColumnQualifier()))) {
                                
indexedColumnTypes.add(expression.getDataType());
                        }
                     return null;
@@ -1513,4 +1524,8 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             return udfParseNodes;
         }
     }
+    
+    public byte[] getEmptyKeyValueQualifier() {
+        return emptyKeyValueQualifierPtr.copyBytes();
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index c67da6e..9ee5ea7 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -67,7 +67,6 @@ import 
org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.write.IndexWriter;
 import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.trace.TracingUtils;
 import org.apache.phoenix.trace.util.NullSpan;
@@ -304,8 +303,16 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
                 for (ColumnReference ref : mutableColumns) {
                     scan.addColumn(ref.getFamily(), ref.getQualifier());
                 }
+                /*
+                 * Indexes inherit the storage scheme of the data table which 
means all the indexes have the same
+                 * storage scheme and empty key value qualifier. Note that 
this assumption would be broken if we start
+                 * supporting new indexes over existing data tables to have a 
different storage scheme than the data
+                 * table.
+                 */
+                byte[] emptyKeyValueQualifier = 
indexMaintainers.get(0).getEmptyKeyValueQualifier();
+                
                 // Project empty key value column
-                
scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), 
QueryConstants.EMPTY_COLUMN_BYTES);
+                
scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), 
emptyKeyValueQualifier);
                 ScanRanges scanRanges = 
ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, 
Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, 
KeyRange.EVERYTHING_RANGE, null, true, -1);
                 scanRanges.initializeScan(scan);
                 TableName tableName = 
env.getRegion().getRegionInfo().getTable();
@@ -356,7 +363,8 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
             Map<ImmutableBytesPtr, MultiMutation> 
mutationsToFindPreviousValue) throws IOException {
         if (scanner != null) {
             Result result;
-            ColumnReference emptyColRef = new 
ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(),
 QueryConstants.EMPTY_COLUMN_BYTES);
+            ColumnReference emptyColRef = new 
ColumnReference(indexMetaData.getIndexMaintainers().get(0)
+                    .getDataEmptyKeyValueCF(), 
indexMetaData.getIndexMaintainers().get(0).getEmptyKeyValueQualifier());
             // Process existing data table rows by removing the old index row 
and adding the new index row
             while ((result = scanner.next()) != null) {
                 Mutation m = mutationsToFindPreviousValue.remove(new 
ImmutableBytesPtr(result.getRow()));
@@ -384,7 +392,7 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
             // to generate point delete markers for all index rows that were 
added. We don't have Tephra
             // manage index rows in change sets because we don't want to be 
hit with the additional
             // memory hit and do not need to do conflict detection on index 
rows.
-            ColumnReference emptyColRef = new 
ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(),
 QueryConstants.EMPTY_COLUMN_BYTES);
+            ColumnReference emptyColRef = new 
ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(),
 indexMetaData.getIndexMaintainers().get(0).getEmptyKeyValueQualifier());
             while ((result = scanner.next()) != null) {
                 Mutation m = mutations.remove(new 
ImmutableBytesPtr(result.getRow()));
                 // Sort by timestamp, type, cf, cq so we can process in time 
batches from oldest to newest

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index ceba000..496b3b0 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -17,12 +17,16 @@
  */
 package org.apache.phoenix.iterate;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY;
 import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
 import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
 import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX;
 import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER;
 import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIMEOUT_COUNTER;
+import static 
org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_NAME;
 import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
+import static org.apache.phoenix.util.ScanUtil.setMinMaxQualifiersOnScan;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInput;
@@ -48,6 +52,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.hbase.HConstants;
+import javax.management.Query;
+
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
@@ -87,8 +93,10 @@ import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.stats.GuidePostsInfo;
 import org.apache.phoenix.schema.stats.PTableStats;
 import org.apache.phoenix.schema.stats.StatisticsUtil;
+import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.Closeables;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.PrefixByteCodec;
 import org.apache.phoenix.util.PrefixByteDecoder;
@@ -207,7 +215,7 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
                             // Project empty key value unless the column 
family containing it has
                             // been projected in its entirety.
                             if (!familyMap.containsKey(ecf) || 
familyMap.get(ecf) != null) {
-                                scan.addColumn(ecf, 
QueryConstants.EMPTY_COLUMN_BYTES);
+                                scan.addColumn(ecf, 
EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst());
                             }
                         }
                     }
@@ -225,7 +233,6 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
             if(offset!=null){
                 ScanUtil.addOffsetAttribute(scan, offset);
             }
-
             int cols = plan.getGroupBy().getOrderPreservingColumnCount();
             if (cols > 0 && context.getWhereConditionColumns().size() == 0 &&
                 
!plan.getStatement().getHint().hasHint(HintNode.Hint.RANGE_SCAN) &&
@@ -237,13 +244,76 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
                         new 
DistinctPrefixFilter(plan.getTableRef().getTable().getRowKeySchema(),
                                 cols));
             }
-
+            //TODO: samarth add condition to not do position based look ups in 
case of joins so that we won't need to do the hacky check inside co-processors.
+            if (setMinMaxQualifiersOnScan(table)) {
+                Pair<Integer, Integer> minMaxQualifiers = 
getMinMaxQualifiers(scan, context);
+                if (minMaxQualifiers != null) {
+                    scan.setAttribute(BaseScannerRegionObserver.MIN_QUALIFIER, 
PInteger.INSTANCE.toBytes(minMaxQualifiers.getFirst()));
+                    scan.setAttribute(BaseScannerRegionObserver.MAX_QUALIFIER, 
PInteger.INSTANCE.toBytes(minMaxQualifiers.getSecond()));
+                }
+            }
             if (optimizeProjection) {
                 optimizeProjection(context, scan, table, statement);
             }
         }
     }
-
+    
+    private static Pair<Integer, Integer> getMinMaxQualifiers(Scan scan, 
StatementContext context) {
+        PTable table = context.getCurrentTable().getTable();
+        checkArgument(EncodedColumnsUtil.usesEncodedColumnNames(table), 
"Method should only be used for tables using encoded column names");
+        Integer minQualifier = null;
+        Integer maxQualifier = null;
+        boolean emptyKVProjected = false;
+        for (Pair<byte[], byte[]> whereCol : 
context.getWhereConditionColumns()) {
+            byte[] cq = whereCol.getSecond();
+            if (cq != null) {
+                int qualifier = (Integer)PInteger.INSTANCE.toObject(cq);
+                if (qualifier == ENCODED_EMPTY_COLUMN_NAME) {
+                    emptyKVProjected = true;
+                    continue;
+                }
+                if (minQualifier == null && maxQualifier == null) {
+                    minQualifier = maxQualifier = qualifier;
+                } else {
+                    if (qualifier < minQualifier) {
+                        minQualifier = qualifier;
+                    } else if (qualifier > maxQualifier) {
+                        maxQualifier = qualifier;
+                    }
+                }
+            }
+        }
+        Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
+        for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) 
{
+            if (entry.getValue() != null) {
+                for (byte[] cq : entry.getValue()) {
+                    if (cq != null) {
+                        int qualifier = 
(Integer)PInteger.INSTANCE.toObject(cq);
+                        if (qualifier == ENCODED_EMPTY_COLUMN_NAME) {
+                            emptyKVProjected = true;
+                            continue;
+                        }
+                        if (minQualifier == null && maxQualifier == null) {
+                            minQualifier = maxQualifier = qualifier;
+                        } else {
+                            if (qualifier < minQualifier) {
+                                minQualifier = qualifier;
+                            } else if (qualifier > maxQualifier) {
+                                maxQualifier = qualifier;
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        if (minQualifier == null && emptyKVProjected) {
+            return new Pair<>(ENCODED_EMPTY_COLUMN_NAME, 
ENCODED_EMPTY_COLUMN_NAME);
+        } else if (minQualifier == null) {
+            return null;
+        }
+        return new Pair<>(minQualifier, maxQualifier);
+    }
+    
     private static void optimizeProjection(StatementContext context, Scan 
scan, PTable table, FilterableStatement statement) {
         Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
         // columnsTracker contain cf -> qualifiers which should get returned.
@@ -340,7 +410,7 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
             // the ExplicitColumnTracker not to be used, though.
             if (!statement.isAggregate() && filteredColumnNotInProjection) {
                 ScanUtil.andFilterAtEnd(scan, new 
ColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table),
-                        columnsTracker, conditionOnlyCfs));
+                        columnsTracker, conditionOnlyCfs, 
EncodedColumnsUtil.usesEncodedColumnNames(table)));
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
index 3293f65..1e5f09e 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
@@ -49,7 +49,7 @@ abstract public class LookAheadResultIterator implements 
PeekingResultIterator {
         };
     }
     
-    private final static Tuple UNINITIALIZED = new ResultTuple();
+    private final static Tuple UNINITIALIZED = ResultTuple.EMPTY_TUPLE;
     private Tuple next = UNINITIALIZED;
     
     abstract protected Tuple advance() throws SQLException;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
index 8ada952..135ab26 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
@@ -180,6 +180,7 @@ public abstract class MappedByteBufferQueue<T> extends 
AbstractQueue<T> {
             return this.index;
         }
         
+        @Override
         public int size() {
             if (flushBuffer)
                 return flushedCount;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
index 8dcb2e8..e4c52c0 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
@@ -32,6 +32,7 @@ import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.OrderByExpression;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.SizedUtil;
 
 import com.google.common.base.Function;
@@ -264,7 +265,7 @@ public class OrderedResultIterator implements 
PeekingResultIterator {
             }
             this.byteSize = queueEntries.getByteSize();
         } catch (IOException e) {
-            throw new SQLException("", e);
+            ServerUtil.createIOException(e.getMessage(), e);
         } finally {
             delegate.close();
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
index 88e141a..531bbe7 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
@@ -24,16 +24,24 @@ import java.util.List;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 
 
 public class RegionScannerResultIterator extends BaseResultIterator {
     private final RegionScanner scanner;
+    private final Pair<Integer, Integer> minMaxQualifiers;
+    private final boolean useQualifierAsIndex;
     
-    public RegionScannerResultIterator(RegionScanner scanner) {
+    public RegionScannerResultIterator(RegionScanner scanner, Pair<Integer, 
Integer> minMaxQualifiers, boolean isJoin) {
         this.scanner = scanner;
+        this.useQualifierAsIndex = 
ScanUtil.useQualifierAsIndex(minMaxQualifiers, isJoin);
+        this.minMaxQualifiers = minMaxQualifiers;
     }
     
     @Override
@@ -43,7 +51,7 @@ public class RegionScannerResultIterator extends 
BaseResultIterator {
         synchronized (scanner) {
             try {
                 // TODO: size
-                List<Cell> results = new ArrayList<Cell>();
+                List<Cell> results = useQualifierAsIndex ? new 
EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), 
minMaxQualifiers.getSecond()) :  new ArrayList<Cell>();
                 // Results are potentially returned even when the return value 
of s.next is false
                 // since this is an indication of whether or not there are 
more values after the
                 // ones returned
@@ -53,7 +61,7 @@ public class RegionScannerResultIterator extends 
BaseResultIterator {
                 }
                 // We instantiate a new tuple because in all cases currently 
we hang on to it
                 // (i.e. to compute and hold onto the TopN).
-                MultiKeyValueTuple tuple = new MultiKeyValueTuple();
+                Tuple tuple = useQualifierAsIndex ? new 
PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
                 tuple.setKeyValues(results);
                 return tuple;
             } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index 2927de1..7da41c7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -108,9 +108,9 @@ import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SQLCloseable;
 import org.apache.phoenix.util.SQLCloseables;
+import org.apache.phoenix.util.SchemaUtil;
 import org.cloudera.htrace.Sampler;
 import org.cloudera.htrace.TraceScope;
-import org.apache.phoenix.util.SchemaUtil;
 
 import org.apache.tephra.TransactionContext;
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 4fd4485..0e7db1c 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -205,7 +205,7 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData {
     public static final byte[] BASE_COLUMN_COUNT_BYTES = 
Bytes.toBytes(BASE_COLUMN_COUNT);
     public static final String IS_ROW_TIMESTAMP = "IS_ROW_TIMESTAMP";
     public static final byte[] IS_ROW_TIMESTAMP_BYTES = 
Bytes.toBytes(IS_ROW_TIMESTAMP);
-
+    
     public static final String TABLE_FAMILY = 
QueryConstants.DEFAULT_COLUMN_FAMILY;
     public static final byte[] TABLE_FAMILY_BYTES = 
QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
 
@@ -315,6 +315,13 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData {
     /** Version below which we fall back on the generic KeyValueBuilder */
     public static final int CLIENT_KEY_VALUE_BUILDER_THRESHOLD = 
VersionUtil.encodeVersion("0", "94", "14");
     
+    public static final String STORAGE_SCHEME = "STORAGE_SCHEME";
+    public static final byte[] STORAGE_SCHEME_BYTES = 
Bytes.toBytes(STORAGE_SCHEME);
+    public static final String ENCODED_COLUMN_QUALIFIER = "COLUMN_QUALIFIER";
+    public static final byte[] ENCODED_COLUMN_QUALIFIER_BYTES = 
Bytes.toBytes(ENCODED_COLUMN_QUALIFIER);
+    public static final String COLUMN_QUALIFIER_COUNTER = "QUALIFIER_COUNTER";
+    public static final byte[] COLUMN_QUALIFIER_COUNTER_BYTES = 
Bytes.toBytes(COLUMN_QUALIFIER_COUNTER);
+
     PhoenixDatabaseMetaData(PhoenixConnection connection) throws SQLException {
         this.emptyResultSet = new 
PhoenixResultSet(ResultIterator.EMPTY_ITERATOR, RowProjector.EMPTY_PROJECTOR, 
new StatementContext(new PhoenixStatement(connection), false));
         this.connection = connection;
@@ -588,9 +595,8 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData {
                 newCells.addAll(cells);
                 newCells.add(kv);
                 Collections.sort(newCells, KeyValue.COMPARATOR);
-                resultTuple.setResult(Result.create(newCells));
+                tuple = new ResultTuple(Result.create(newCells));
             }
-
             return tuple;
         }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
index 47c17ae..3ca48a1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
@@ -107,7 +107,7 @@ public class PhoenixResultSet implements ResultSet, 
SQLCloseable {
     private final static String STRING_FALSE = "0";
     private final static BigDecimal BIG_DECIMAL_FALSE = BigDecimal.valueOf(0);
     private final static Integer INTEGER_FALSE = Integer.valueOf(0);
-    private final static Tuple BEFORE_FIRST = new ResultTuple();
+    private final static Tuple BEFORE_FIRST = ResultTuple.EMPTY_TUPLE;
 
     private final ResultIterator scanner;
     private final RowProjector rowProjector;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java 
b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
index 908a117..2d7550a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
@@ -122,6 +122,7 @@ public class HashCacheFactory implements ServerCacheFactory 
{
                     int resultSize = (int)Bytes.readVLong(hashCacheByteArray, 
offset);
                     offset += 
WritableUtils.decodeVIntSize(hashCacheByteArray[offset]);
                     ImmutableBytesWritable value = new 
ImmutableBytesWritable(hashCacheByteArray,offset,resultSize);
+                    //TODO: samarth make joins work with position look up.
                     Tuple result = new ResultTuple(ResultUtil.toResult(value));
                     ImmutableBytesPtr key = 
TupleUtil.getConcatenatedValue(result, onExpressions);
                     List<Tuple> tuples = hashCacheMap.get(key);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
index cacbce7..d94fa42 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.TreeMap;
+
 import javax.annotation.Nullable;
 
 import org.apache.hadoop.conf.Configuration;
@@ -48,6 +49,7 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
@@ -207,7 +209,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> 
extends Mapper<LongWri
     not care about it
      */
     private void initColumnIndexes() throws SQLException {
-        columnIndexes = new TreeMap(Bytes.BYTES_COMPARATOR);
+        columnIndexes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
         int columnIndex = 0;
         for(int index = 0; index < logicalNames.size(); index++) {
             PTable table = PhoenixRuntime.getTable(conn, 
logicalNames.get(index));
@@ -215,18 +217,23 @@ public abstract class FormatToBytesWritableMapper<RECORD> 
extends Mapper<LongWri
             for (int i = 0; i < cls.size(); i++) {
                 PColumn c = cls.get(i);
                 byte[] family = new byte[0];
-                if (c.getFamilyName() != null)  // Skip PK column
+                byte[] cq;
+                if (!SchemaUtil.isPKColumn(c)) {
                     family = c.getFamilyName().getBytes();
-                byte[] name = c.getName().getBytes();
-                byte[] cfn = Bytes.add(family, 
QueryConstants.NAMESPACE_SEPARATOR_BYTES, name);
+                    cq = EncodedColumnsUtil.getColumnQualifier(c, table);
+                } else {
+                    // TODO: samarth verify if this is the right thing to do 
here.
+                    cq = c.getName().getBytes();
+                }
+                byte[] cfn = Bytes.add(family, 
QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq);
                 if (!columnIndexes.containsKey(cfn)) {
                     columnIndexes.put(cfn, new Integer(columnIndex));
                     columnIndex++;
                 }
             }
             byte[] emptyColumnFamily = SchemaUtil.getEmptyColumnFamily(table);
-            byte[] cfn = Bytes.add(emptyColumnFamily, 
QueryConstants.NAMESPACE_SEPARATOR_BYTES,
-                    QueryConstants.EMPTY_COLUMN_BYTES);
+            byte[] emptyKeyValue = 
EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst();
+            byte[] cfn = Bytes.add(emptyColumnFamily, 
QueryConstants.NAMESPACE_SEPARATOR_BYTES, emptyKeyValue);
             columnIndexes.put(cfn, new Integer(columnIndex));
             columnIndex++;
         }
@@ -242,9 +249,9 @@ public abstract class FormatToBytesWritableMapper<RECORD> 
extends Mapper<LongWri
     private int findIndex(Cell cell) throws IOException {
         byte[] familyName = Bytes.copy(cell.getFamilyArray(), 
cell.getFamilyOffset(),
                 cell.getFamilyLength());
-        byte[] name = Bytes.copy(cell.getQualifierArray(), 
cell.getQualifierOffset(),
+        byte[] cq = Bytes.copy(cell.getQualifierArray(), 
cell.getQualifierOffset(),
                 cell.getQualifierLength());
-        byte[] cfn = Bytes.add(familyName, 
QueryConstants.NAMESPACE_SEPARATOR_BYTES, name);
+        byte[] cfn = Bytes.add(familyName, 
QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq);
         if(columnIndexes.containsKey(cfn)) {
             return columnIndexes.get(cfn);
         }
@@ -397,4 +404,4 @@ public abstract class FormatToBytesWritableMapper<RECORD> 
extends Mapper<LongWri
             return keyValues;
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
index 15d6d2f..c5f690b 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
@@ -44,6 +44,7 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.util.Closeables;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
@@ -89,7 +90,7 @@ public class FormatToKeyValueReducer
     }
 
     private void initColumnsMap(PhoenixConnection conn) throws SQLException {
-        Map<byte[], Integer> indexMap = new TreeMap(Bytes.BYTES_COMPARATOR);
+        Map<byte[], Integer> indexMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
         columnIndexes = new HashMap<>();
         int columnIndex = 0;
         for (int index = 0; index < logicalNames.size(); index++) {
@@ -98,12 +99,16 @@ public class FormatToKeyValueReducer
             for (int i = 0; i < cls.size(); i++) {
                 PColumn c = cls.get(i);
                 byte[] family = new byte[0];
-                if (c.getFamilyName() != null) {
+                byte[] cq;
+                if (!SchemaUtil.isPKColumn(c)) {
                     family = c.getFamilyName().getBytes();
+                    cq = EncodedColumnsUtil.getColumnQualifier(c, table);
+                } else {
+                    // TODO: samarth verify if this is the right thing to do 
here.
+                    cq = c.getName().getBytes();
                 }
-                byte[] name = c.getName().getBytes();
-                byte[] cfn = Bytes.add(family, 
QueryConstants.NAMESPACE_SEPARATOR_BYTES, name);
-                Pair<byte[], byte[]> pair = new Pair(family, name);
+                byte[] cfn = Bytes.add(family, 
QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq);
+                Pair<byte[], byte[]> pair = new Pair<>(family, cq);
                 if (!indexMap.containsKey(cfn)) {
                     indexMap.put(cfn, new Integer(columnIndex));
                     columnIndexes.put(new Integer(columnIndex), pair);
@@ -111,8 +116,8 @@ public class FormatToKeyValueReducer
                 }
             }
             byte[] emptyColumnFamily = SchemaUtil.getEmptyColumnFamily(table);
-            Pair<byte[], byte[]> pair = new Pair(emptyColumnFamily, 
QueryConstants
-                    .EMPTY_COLUMN_BYTES);
+            byte[] emptyKeyValue = 
EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst();
+            Pair<byte[], byte[]> pair = new Pair<>(emptyColumnFamily, 
emptyKeyValue);
             columnIndexes.put(new Integer(columnIndex), pair);
             columnIndex++;
         }
@@ -123,18 +128,17 @@ public class FormatToKeyValueReducer
                           Reducer<TableRowkeyPair, ImmutableBytesWritable, 
TableRowkeyPair, KeyValue>.Context context)
             throws IOException, InterruptedException {
         TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
-        ImmutableBytesWritable rowKey = key.getRowkey();
         for (ImmutableBytesWritable aggregatedArray : values) {
             DataInputStream input = new DataInputStream(new 
ByteArrayInputStream(aggregatedArray.get()));
             while (input.available() != 0) {
                 byte type = input.readByte();
                 int index = WritableUtils.readVInt(input);
                 ImmutableBytesWritable family;
-                ImmutableBytesWritable name;
+                ImmutableBytesWritable cq;
                 ImmutableBytesWritable value = 
QueryConstants.EMPTY_COLUMN_VALUE_BYTES_PTR;
                 Pair<byte[], byte[]> pair = columnIndexes.get(index);
                 family = new ImmutableBytesWritable(pair.getFirst());
-                name = new ImmutableBytesWritable(pair.getSecond());
+                cq = new ImmutableBytesWritable(pair.getSecond());
                 int len = WritableUtils.readVInt(input);
                 if (len > 0) {
                     byte[] array = new byte[len];
@@ -145,10 +149,10 @@ public class FormatToKeyValueReducer
                 KeyValue.Type kvType = KeyValue.Type.codeToType(type);
                 switch (kvType) {
                     case Put: // not null value
-                        kv = builder.buildPut(key.getRowkey(), family, name, 
value);
+                        kv = builder.buildPut(key.getRowkey(), family, cq, 
value);
                         break;
                     case DeleteColumn: // null value
-                        kv = builder.buildDeleteColumns(key.getRowkey(), 
family, name);
+                        kv = builder.buildDeleteColumns(key.getRowkey(), 
family, cq);
                         break;
                     default:
                         throw new IOException("Unsupported KeyValue type " + 
kvType);
@@ -164,4 +168,4 @@ public class FormatToKeyValueReducer
             if (++index % 100 == 0) context.setStatus("Wrote " + index);
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index f14371d..bdb2432 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -32,8 +32,6 @@ import static 
org.apache.phoenix.util.UpgradeUtil.upgradeTo4_5_0;
 
 import java.io.IOException;
 import java.lang.ref.WeakReference;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -85,7 +83,6 @@ import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
 import org.apache.hadoop.hbase.regionserver.IndexHalfStoreFileReaderGenerator;
-import org.apache.hadoop.hbase.regionserver.LocalIndexSplitter;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -161,7 +158,6 @@ import org.apache.phoenix.schema.PMetaDataImpl;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
 import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.ReadOnlyTableException;
@@ -182,7 +178,6 @@ import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.Closeables;
 import org.apache.phoenix.util.ConfigUtil;
-import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.JDBCUtil;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixContextExecutor;
@@ -570,6 +565,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         }
     }
     
+    @Override
     public PMetaData updateResolvedTimestamp(PTable table, long resolvedTime) 
throws SQLException {
        synchronized (latestMetaDataLock) {
             throwConnectionClosedIfNullMetaData();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 9f8f58c..b1733f0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -31,6 +31,7 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CURRENT_VALUE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CYCLE_FLAG;
@@ -40,6 +41,7 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODED_COLUMN_QUALIFIER;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FUNCTION_NAME;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POSTS_ROW_COUNT;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH;
@@ -85,6 +87,7 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SOURCE_DATA_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SQL_DATA_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SQL_DATETIME_SUB;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_WITH;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORAGE_SCHEME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORE_NULLS;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
@@ -118,7 +121,7 @@ import org.apache.phoenix.schema.MetaDataSplitPolicy;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
 import org.apache.phoenix.schema.SortOrder;
-import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.schema.types.PInteger;
 
 
 /**
@@ -148,23 +151,30 @@ public interface QueryConstants {
     public final static byte[] OFFSET_ROW_KEY_BYTES = 
Bytes.toBytes(OFFSET_ROW_KEY);
     public final static ImmutableBytesPtr OFFSET_ROW_KEY_PTR = new 
ImmutableBytesPtr(OFFSET_ROW_KEY_BYTES);
 
-    public final static PName SINGLE_COLUMN_NAME = 
PNameFactory.newNormalizedName("s");
-    public final static PName SINGLE_COLUMN_FAMILY_NAME = 
PNameFactory.newNormalizedName("s");
-    public final static byte[] SINGLE_COLUMN = SINGLE_COLUMN_NAME.getBytes();
-    public final static byte[] SINGLE_COLUMN_FAMILY = 
SINGLE_COLUMN_FAMILY_NAME.getBytes();
-
     public static final long AGG_TIMESTAMP = HConstants.LATEST_TIMESTAMP;
     /**
      * Key used for a single row aggregation where there is no group by
      */
     public final static byte[] UNGROUPED_AGG_ROW_KEY = Bytes.toBytes("a");
-    public final static PName AGG_COLUMN_NAME = SINGLE_COLUMN_NAME;
-    public final static PName AGG_COLUMN_FAMILY_NAME = 
SINGLE_COLUMN_FAMILY_NAME;
-
-    public static final byte[] ARRAY_VALUE_COLUMN_FAMILY = Bytes.toBytes("a");
-    // Use empty byte array for column qualifier so as not to accidentally 
conflict with any other columns
-    public static final byte[] ARRAY_VALUE_COLUMN_QUALIFIER = 
ByteUtil.EMPTY_BYTE_ARRAY;
+    
+    /** BEGIN Set of reserved column qualifiers **/
+    
+    public static final String RESERVED_COLUMN_FAMILY = "_r";
+    public static final byte[] RESERVED_COLUMN_FAMILY_BYTES = 
Bytes.toBytes(RESERVED_COLUMN_FAMILY);
+    
+    public static final byte[] VALUE_COLUMN_FAMILY = 
RESERVED_COLUMN_FAMILY_BYTES;
+    public static final byte[] VALUE_COLUMN_QUALIFIER = 
PInteger.INSTANCE.toBytes(1);
+    
+    public static final byte[] ARRAY_VALUE_COLUMN_FAMILY = 
RESERVED_COLUMN_FAMILY_BYTES;
+    public static final byte[] ARRAY_VALUE_COLUMN_QUALIFIER = 
PInteger.INSTANCE.toBytes(2);
+    
+    public final static PName SINGLE_COLUMN_NAME = 
PNameFactory.newNormalizedName("s");
+    public final static PName SINGLE_COLUMN_FAMILY_NAME = 
PNameFactory.newNormalizedName("s");
+    public final static byte[] SINGLE_COLUMN = PInteger.INSTANCE.toBytes(3);
+    public final static byte[] SINGLE_COLUMN_FAMILY = 
RESERVED_COLUMN_FAMILY_BYTES;
 
+    /** END Set of reserved column qualifiers **/
+    
     public static final byte[] TRUE = new byte[] {1};
     
 
@@ -186,11 +196,18 @@ public interface QueryConstants {
     public static final byte[] EMPTY_COLUMN_BYTES = 
Bytes.toBytes(EMPTY_COLUMN_NAME);
     public static final ImmutableBytesPtr EMPTY_COLUMN_BYTES_PTR = new 
ImmutableBytesPtr(
             EMPTY_COLUMN_BYTES);
+    public static final Integer ENCODED_EMPTY_COLUMN_NAME = 0;
+    public static final byte[] ENCODED_EMPTY_COLUMN_BYTES = 
PInteger.INSTANCE.toBytes(ENCODED_EMPTY_COLUMN_NAME);
+    public static final ImmutableBytesPtr ENCODED_EMPTY_COLUMN_BYTES_PTR = new 
ImmutableBytesPtr(
+            ENCODED_EMPTY_COLUMN_BYTES);
     public final static String EMPTY_COLUMN_VALUE = "x";
     public final static byte[] EMPTY_COLUMN_VALUE_BYTES = 
Bytes.toBytes(EMPTY_COLUMN_VALUE);
     public static final ImmutableBytesPtr EMPTY_COLUMN_VALUE_BYTES_PTR = new 
ImmutableBytesPtr(
             EMPTY_COLUMN_VALUE_BYTES);
-
+    public static final String ENCODED_EMPTY_COLUMN_VALUE = EMPTY_COLUMN_VALUE;
+    public final static byte[] ENCODED_EMPTY_COLUMN_VALUE_BYTES = 
Bytes.toBytes(EMPTY_COLUMN_VALUE);
+    public static final ImmutableBytesPtr ENCODED_EMPTY_COLUMN_VALUE_BYTES_PTR 
= new ImmutableBytesPtr(
+            ENCODED_EMPTY_COLUMN_VALUE_BYTES);
     public static final String DEFAULT_COLUMN_FAMILY = "0";
     public static final byte[] DEFAULT_COLUMN_FAMILY_BYTES = 
Bytes.toBytes(DEFAULT_COLUMN_FAMILY);
     public static final ImmutableBytesPtr DEFAULT_COLUMN_FAMILY_BYTES_PTR = 
new ImmutableBytesPtr(
@@ -216,6 +233,13 @@ public interface QueryConstants {
     public static final int NANOS_IN_SECOND = BigDecimal.valueOf(Math.pow(10, 
9)).intValue();
     public static final int DIVERGED_VIEW_BASE_COLUMN_COUNT = -100;
     public static final int BASE_TABLE_BASE_COLUMN_COUNT = -1;
+    
+    //TODO: samarth think about this more.
+    /**
+     * We mark counter values 0 to 10 as reserved. Value 0 is used by {@link 
#ENCODED_EMPTY_COLUMN_NAME}. Values 1-10
+     * are reserved for special column qualifiers returned by Phoenix 
co-processors.
+     */
+    public static final int ENCODED_CQ_COUNTER_INITIAL_VALUE = 11;
     public static final String CREATE_TABLE_METADATA =
             // Do not use IF NOT EXISTS as we sometimes catch the 
TableAlreadyExists
             // exception and add columns to the SYSTEM.TABLE dynamically.
@@ -282,6 +306,9 @@ public interface QueryConstants {
             IS_NAMESPACE_MAPPED + " BOOLEAN," +
             AUTO_PARTITION_SEQ + " VARCHAR," +
             APPEND_ONLY_SCHEMA + " BOOLEAN," +
+            ENCODED_COLUMN_QUALIFIER + " INTEGER," +
+            STORAGE_SCHEME + " TINYINT, " + 
+            COLUMN_QUALIFIER_COUNTER + " INTEGER, " +
             "CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + 
TENANT_ID + ","
             + TABLE_SCHEM + "," + TABLE_NAME + "," + COLUMN_NAME + "," + 
COLUMN_FAMILY + "))\n" +
             HConstants.VERSIONS + "=" + 
MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" +

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 892482d..0761b73 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -238,7 +238,7 @@ public class QueryServicesOptions {
     // doesn't depend on phoenix-core.
     public static final String DEFAULT_QUERY_SERVER_SERIALIZATION = "PROTOBUF";
     public static final int DEFAULT_QUERY_SERVER_HTTP_PORT = 8765;
-    public static final boolean DEFAULT_RENEW_LEASE_ENABLED = true;
+    public static final boolean DEFAULT_RENEW_LEASE_ENABLED = false;
     public static final int 
DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS =
             DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD / 2;
     public static final int DEFAULT_RENEW_LEASE_THRESHOLD_MILLISECONDS =

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
index 76f6218..544fb20 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
@@ -22,6 +22,7 @@ import org.apache.phoenix.expression.ColumnExpression;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
 import org.apache.phoenix.expression.ProjectedColumnExpression;
 import org.apache.phoenix.expression.RowKeyColumnExpression;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.SchemaUtil;
 
 
@@ -45,7 +46,7 @@ public class ColumnRef {
     }
 
     public ColumnRef(TableRef tableRef, String familyName, String columnName) 
throws MetaDataEntityNotFoundException {
-        this(tableRef, 
tableRef.getTable().getColumnFamily(familyName).getColumn(columnName).getPosition());
+        this(tableRef, 
tableRef.getTable().getColumnFamily(familyName).getPColumnForColumnName(columnName).getPosition());
     }
 
     public ColumnRef(TableRef tableRef, int columnPosition) {
@@ -109,7 +110,7 @@ public class ColumnRef {
                return new ProjectedColumnExpression(column, table, 
displayName);
         }
        
-        return new KeyValueColumnExpression(column, displayName);
+        return new KeyValueColumnExpression(column, displayName, 
EncodedColumnsUtil.usesEncodedColumnNames(table));
     }
 
     public ColumnRef cloneAtTimestamp(long timestamp) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
index a60229e..4ac8f46 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
@@ -90,4 +90,9 @@ public class DelegateColumn extends DelegateDatum implements 
PColumn {
        public boolean isDynamic() {
                return getDelegate().isDynamic();
        }
+
+    @Override
+    public Integer getEncodedColumnQualifier() {
+        return getDelegate().getEncodedColumnQualifier();
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index 3ee012f..c7547c3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -96,8 +96,8 @@ public class DelegateTable implements PTable {
     }
 
     @Override
-    public PColumn getColumn(String name) throws ColumnNotFoundException, 
AmbiguousColumnException {
-        return delegate.getColumn(name);
+    public PColumn getPColumnForColumnName(String name) throws 
ColumnNotFoundException, AmbiguousColumnException {
+        return delegate.getPColumnForColumnName(name);
     }
 
     @Override
@@ -280,4 +280,20 @@ public class DelegateTable implements PTable {
     public boolean isAppendOnlySchema() {
         return delegate.isAppendOnlySchema();
     }
+    
+    @Override
+    public StorageScheme getStorageScheme() {
+        return delegate.getStorageScheme();
+    }
+
+    @Override
+    public PColumn getPColumnForColumnQualifier(byte[] cq) throws 
ColumnNotFoundException, AmbiguousColumnException {
+        return delegate.getPColumnForColumnQualifier(cq);
+    }
+
+    @Override
+    public EncodedCQCounter getEncodedCQCounter() {
+        return delegate.getEncodedCQCounter();
+
+    }
 }

Reply via email to