http://git-wip-us.apache.org/repos/asf/phoenix/blob/a41074a9/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 6357e52..3cca2de 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -26,7 +26,6 @@ import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIM
 import static org.apache.phoenix.schema.PTable.IndexType.LOCAL;
 import static org.apache.phoenix.schema.PTableType.INDEX;
 import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
-import static 
org.apache.phoenix.util.EncodedColumnsUtil.getEncodedColumnQualifier;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInput;
@@ -87,6 +86,7 @@ import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PColumnFamily;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.PTable.StorageScheme;
 import org.apache.phoenix.schema.PTable.ViewType;
 import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
@@ -248,13 +248,14 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
                     ScanUtil.andFilterAtEnd(scan, new 
PageFilter(plan.getLimit()));
                 }
             }
+            
scan.setAttribute(BaseScannerRegionObserver.QUALIFIER_ENCODING_SCHEME, new 
byte[]{table.getEncodingScheme().getSerializedMetadataValue()});
             // When analyzing the table, there is no look up for key values 
being done.
             // So there is no point setting the range.
             if (EncodedColumnsUtil.setQualifierRanges(table) && 
!ScanUtil.isAnalyzeTable(scan)) {
                 Pair<Integer, Integer> range = getEncodedQualifierRange(scan, 
context);
                 if (range != null) {
-                    scan.setAttribute(BaseScannerRegionObserver.MIN_QUALIFIER, 
getEncodedColumnQualifier(range.getFirst()));
-                    scan.setAttribute(BaseScannerRegionObserver.MAX_QUALIFIER, 
getEncodedColumnQualifier(range.getSecond()));
+                    scan.setAttribute(BaseScannerRegionObserver.MIN_QUALIFIER, 
Bytes.toBytes(range.getFirst()));
+                    scan.setAttribute(BaseScannerRegionObserver.MAX_QUALIFIER, 
Bytes.toBytes(range.getSecond()));
                 }
             }
             if (optimizeProjection) {
@@ -266,25 +267,25 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
     private static Pair<Integer, Integer> getEncodedQualifierRange(Scan scan, 
StatementContext context)
             throws SQLException {
         PTable table = context.getCurrentTable().getTable();
-        StorageScheme storageScheme = table.getStorageScheme();
-        checkArgument(storageScheme == 
StorageScheme.ONE_CELL_PER_KEYVALUE_COLUMN,
+        QualifierEncodingScheme encodingScheme = table.getEncodingScheme();
+        checkArgument(encodingScheme != 
QualifierEncodingScheme.NON_ENCODED_QUALIFIERS,
             "Method should only be used for tables using encoded column 
names");
         Pair<Integer, Integer> minMaxQualifiers = new Pair<>();
         for (Pair<byte[], byte[]> whereCol : 
context.getWhereConditionColumns()) {
             byte[] cq = whereCol.getSecond();
             if (cq != null) {
-                int qualifier = getEncodedColumnQualifier(cq);
+                int qualifier = table.getEncodingScheme().getDecodedValue(cq);
                 determineQualifierRange(qualifier, minMaxQualifiers);
             }
         }
         Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
 
-        Map<String, Pair<Integer, Integer>> qualifierRanges = 
EncodedColumnsUtil.getQualifierRanges(table);
+        Map<String, Pair<Integer, Integer>> qualifierRanges = 
EncodedColumnsUtil.getFamilyQualifierRanges(table);
         for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) 
{
             if (entry.getValue() != null) {
                 for (byte[] cq : entry.getValue()) {
                     if (cq != null) {
-                        int qualifier = getEncodedColumnQualifier(cq);
+                        int qualifier = 
table.getEncodingScheme().getDecodedValue(cq);
                         determineQualifierRange(qualifier, minMaxQualifiers);
                     }
                 }
@@ -299,8 +300,10 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
                     family = IndexUtil.getLocalIndexColumnFamily(family);
                 }
                 Pair<Integer, Integer> range = qualifierRanges.get(family);
-                determineQualifierRange(range.getFirst(), minMaxQualifiers);
-                determineQualifierRange(range.getSecond(), minMaxQualifiers);
+                if (range != null) {
+                    determineQualifierRange(range.getFirst(), 
minMaxQualifiers);
+                    determineQualifierRange(range.getSecond(), 
minMaxQualifiers);
+                }
             }
         }
         if (minMaxQualifiers.getFirst() == null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a41074a9/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
index 239539f..4b89133 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
 import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple;
@@ -37,11 +38,13 @@ public class RegionScannerResultIterator extends 
BaseResultIterator {
     private final RegionScanner scanner;
     private final Pair<Integer, Integer> minMaxQualifiers;
     private final boolean useQualifierAsIndex;
+    private final QualifierEncodingScheme encodingScheme;
     
-    public RegionScannerResultIterator(RegionScanner scanner, Pair<Integer, 
Integer> minMaxQualifiers) {
+    public RegionScannerResultIterator(RegionScanner scanner, Pair<Integer, 
Integer> minMaxQualifiers, QualifierEncodingScheme encodingScheme) {
         this.scanner = scanner;
         this.useQualifierAsIndex = 
EncodedColumnsUtil.useQualifierAsIndex(minMaxQualifiers);
         this.minMaxQualifiers = minMaxQualifiers;
+        this.encodingScheme = encodingScheme;
     }
     
     @Override
@@ -51,7 +54,7 @@ public class RegionScannerResultIterator extends 
BaseResultIterator {
         synchronized (scanner) {
             try {
                 // TODO: size
-                List<Cell> results = useQualifierAsIndex ? new 
EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), 
minMaxQualifiers.getSecond()) :  new ArrayList<Cell>();
+                List<Cell> results = useQualifierAsIndex ? new 
EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), 
minMaxQualifiers.getSecond(), encodingScheme) :  new ArrayList<Cell>();
                 // Results are potentially returned even when the return value 
of s.next is false
                 // since this is an indication of whether or not there are 
more values after the
                 // ones returned

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a41074a9/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 40952cd..fd8ac8b 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -324,8 +324,8 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData {
     public static final byte[] STORAGE_SCHEME_BYTES = 
Bytes.toBytes(STORAGE_SCHEME);
     public static final String ENCODING_SCHEME = "ENCODING_SCHEME";
     public static final byte[] ENCODING_SCHEME_BYTES = 
Bytes.toBytes(ENCODING_SCHEME);
-    public static final String ENCODED_COLUMN_QUALIFIER = "COLUMN_QUALIFIER";
-    public static final byte[] ENCODED_COLUMN_QUALIFIER_BYTES = 
Bytes.toBytes(ENCODED_COLUMN_QUALIFIER);
+    public static final String COLUMN_QUALIFIER = "COLUMN_QUALIFIER";
+    public static final byte[] COLUMN_QUALIFIER_BYTES = 
Bytes.toBytes(COLUMN_QUALIFIER);
     public static final String COLUMN_QUALIFIER_COUNTER = "QUALIFIER_COUNTER";
     public static final byte[] COLUMN_QUALIFIER_COUNTER_BYTES = 
Bytes.toBytes(COLUMN_QUALIFIER_COUNTER);
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a41074a9/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
index a6a57c7..2ef2127 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
@@ -220,7 +220,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> 
extends Mapper<LongWri
                 byte[] cq;
                 if (!SchemaUtil.isPKColumn(c)) {
                     family = c.getFamilyName().getBytes();
-                    cq = EncodedColumnsUtil.getColumnQualifier(c, table);
+                    cq = c.getColumnQualifierBytes();
                 } else {
                     // TODO: samarth verify if this is the right thing to do 
here.
                     cq = c.getName().getBytes();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a41074a9/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
index c5f690b..c529afe 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
@@ -102,7 +102,7 @@ public class FormatToKeyValueReducer
                 byte[] cq;
                 if (!SchemaUtil.isPKColumn(c)) {
                     family = c.getFamilyName().getBytes();
-                    cq = EncodedColumnsUtil.getColumnQualifier(c, table);
+                    cq = c.getColumnQualifierBytes();
                 } else {
                     // TODO: samarth verify if this is the right thing to do 
here.
                     cq = c.getName().getBytes();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a41074a9/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index fe0d9cb..54d9c74 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -31,6 +31,7 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CURRENT_VALUE;
@@ -41,7 +42,6 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL;
-import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODED_COLUMN_QUALIFIER;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODING_SCHEME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FUNCTION_NAME;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POSTS_ROW_COUNT;
@@ -108,7 +108,7 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE;
-import static 
org.apache.phoenix.util.EncodedColumnsUtil.getEncodedColumnQualifier;
+import static 
org.apache.phoenix.schema.PTable.QualifierEncodingScheme.FOUR_BYTE_QUALIFIERS;
 
 import java.math.BigDecimal;
 
@@ -165,14 +165,18 @@ public interface QueryConstants {
     public static final byte[] RESERVED_COLUMN_FAMILY_BYTES = 
Bytes.toBytes(RESERVED_COLUMN_FAMILY);
     
     public static final byte[] VALUE_COLUMN_FAMILY = 
RESERVED_COLUMN_FAMILY_BYTES;
-    public static final byte[] VALUE_COLUMN_QUALIFIER = 
getEncodedColumnQualifier(1);
+    //TODO: samarth think about the implication of using the four byte scheme 
here. Can we just
+    // get away with storing them in a single byte? We would need to make our 
encoding scheme
+    // cognizant of the fact that all bytes may not be available making them 
interoperable.
+    // In other words allow upper casting but not downcasting.
+    public static final byte[] VALUE_COLUMN_QUALIFIER = 
FOUR_BYTE_QUALIFIERS.getEncodedBytes(1);
     
     public static final byte[] ARRAY_VALUE_COLUMN_FAMILY = 
RESERVED_COLUMN_FAMILY_BYTES;
-    public static final byte[] ARRAY_VALUE_COLUMN_QUALIFIER = 
getEncodedColumnQualifier(2);
+    public static final byte[] ARRAY_VALUE_COLUMN_QUALIFIER = 
FOUR_BYTE_QUALIFIERS.getEncodedBytes(2);
     
     public final static PName SINGLE_COLUMN_NAME = 
PNameFactory.newNormalizedName("s");
     public final static PName SINGLE_COLUMN_FAMILY_NAME = 
PNameFactory.newNormalizedName("s");
-    public final static byte[] SINGLE_COLUMN = getEncodedColumnQualifier(3);
+    public final static byte[] SINGLE_COLUMN = 
FOUR_BYTE_QUALIFIERS.getEncodedBytes(3);
     public final static byte[] SINGLE_COLUMN_FAMILY = 
RESERVED_COLUMN_FAMILY_BYTES;
 
     /** END Set of reserved column qualifiers **/
@@ -204,7 +208,7 @@ public interface QueryConstants {
     public static final ImmutableBytesPtr EMPTY_COLUMN_BYTES_PTR = new 
ImmutableBytesPtr(
             EMPTY_COLUMN_BYTES);
     public static final Integer ENCODED_EMPTY_COLUMN_NAME = 0;
-    public static final byte[] ENCODED_EMPTY_COLUMN_BYTES = 
getEncodedColumnQualifier(ENCODED_EMPTY_COLUMN_NAME);
+    public static final byte[] ENCODED_EMPTY_COLUMN_BYTES = 
FOUR_BYTE_QUALIFIERS.getEncodedBytes(ENCODED_EMPTY_COLUMN_NAME);
     public static final ImmutableBytesPtr ENCODED_EMPTY_COLUMN_BYTES_PTR = new 
ImmutableBytesPtr(
             ENCODED_EMPTY_COLUMN_BYTES);
     public final static String EMPTY_COLUMN_VALUE = "x";
@@ -318,7 +322,7 @@ public interface QueryConstants {
             AUTO_PARTITION_SEQ + " VARCHAR," +
             APPEND_ONLY_SCHEMA + " BOOLEAN," +
             GUIDE_POSTS_WIDTH + " BIGINT," +
-            ENCODED_COLUMN_QUALIFIER + " UNSIGNED_INT," +
+            COLUMN_QUALIFIER + " VARBINARY," +
             STORAGE_SCHEME + " TINYINT, " +
             ENCODING_SCHEME + " TINYINT, " +
             COLUMN_QUALIFIER_COUNTER + " INTEGER, " +

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a41074a9/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
index d757322..c997074 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
@@ -17,8 +17,6 @@
  */
 package org.apache.phoenix.schema;
 
-import static 
org.apache.phoenix.util.EncodedColumnsUtil.usesEncodedColumnNames;
-
 import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.util.Arrays;
@@ -38,7 +36,6 @@ import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.SQLParser;
 import org.apache.phoenix.schema.PTable.StorageScheme;
-import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.ExpressionUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.SchemaUtil;
@@ -129,7 +126,7 @@ public class ColumnRef {
         }
 
         Expression expression = table.getStorageScheme() == 
StorageScheme.ONE_CELL_PER_COLUMN_FAMILY ? 
-                       new ArrayColumnExpression(column, displayName, 
EncodedColumnsUtil.usesEncodedColumnNames(table)) : new 
KeyValueColumnExpression(column, displayName, usesEncodedColumnNames(table));
+                       new ArrayColumnExpression(column, displayName, 
table.getEncodingScheme()) : new KeyValueColumnExpression(column, displayName);
 
         if (column.getExpressionStr() != null) {
             String url = PhoenixRuntime.JDBC_PROTOCOL
@@ -146,7 +143,6 @@ public class ColumnRef {
                 return new DefaultValueExpression(Arrays.asList(expression, 
defaultExpression));
             }
         }
-       
         return expression;
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a41074a9/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
index cd9c2c0..c220ed0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
@@ -101,7 +101,7 @@ public class DelegateColumn extends DelegateDatum 
implements PColumn {
            return getDelegate().equals(o);
        }
     @Override
-    public Integer getEncodedColumnQualifier() {
-        return getDelegate().getEncodedColumnQualifier();
+    public byte[] getColumnQualifierBytes() {
+        return getDelegate().getColumnQualifierBytes();
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a41074a9/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 06aa479..495969f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -34,6 +34,7 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME;
@@ -42,7 +43,6 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL;
-import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODED_COLUMN_QUALIFIER;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODING_SCHEME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FUNCTION_NAME;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH;
@@ -363,7 +363,7 @@ public class MetaDataClient {
                     PK_NAME + "," +  // write this both in the column and 
table rows for access by metadata APIs
                     KEY_SEQ + "," +
                     COLUMN_DEF + "," +
-                    ENCODED_COLUMN_QUALIFIER + ", " +
+                    COLUMN_QUALIFIER + ", " +
                     IS_ROW_TIMESTAMP +
                     ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?, ?)";
     private static final String INSERT_COLUMN_ALTER_TABLE =
@@ -386,7 +386,7 @@ public class MetaDataClient {
                     PK_NAME + "," +  // write this both in the column and 
table rows for access by metadata APIs
                     KEY_SEQ + "," +
                     COLUMN_DEF + "," +
-                    ENCODED_COLUMN_QUALIFIER +
+                    COLUMN_QUALIFIER +
                     ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?)";
     private static final String UPDATE_COLUMN_POSITION =
             "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + 
SYSTEM_CATALOG_TABLE + "\" ( " +
@@ -834,10 +834,10 @@ public class MetaDataClient {
         } else {
             colUpsert.setString(18, column.getExpressionStr());
         }
-        if (column.getEncodedColumnQualifier() == null) {
-            colUpsert.setNull(19, Types.INTEGER);
+        if (column.getColumnQualifierBytes() == null) {
+            colUpsert.setNull(19, Types.VARBINARY);
         } else {
-            colUpsert.setInt(19, column.getEncodedColumnQualifier());
+            colUpsert.setBytes(19, column.getColumnQualifierBytes());
         }
         if (colUpsert.getParameterMetaData().getParameterCount() > 19) {
             colUpsert.setBoolean(20, column.isRowTimestamp());
@@ -859,7 +859,7 @@ public class MetaDataClient {
         argUpsert.execute();
     }
 
-    private PColumn newColumn(int position, ColumnDef def, 
PrimaryKeyConstraint pkConstraint, String defaultColumnFamily, boolean 
addingToPK, Integer encodedColumnQualifier) throws SQLException {
+    private PColumn newColumn(int position, ColumnDef def, 
PrimaryKeyConstraint pkConstraint, String defaultColumnFamily, boolean 
addingToPK, byte[] columnQualifierBytes) throws SQLException {
         try {
             ColumnName columnDefName = def.getColumnDefName();
             SortOrder sortOrder = def.getSortOrder();
@@ -908,7 +908,7 @@ public class MetaDataClient {
                 isNull = false;
             }
             PColumn column = new PColumnImpl(PNameFactory.newName(columnName), 
familyName, def.getDataType(),
-                    def.getMaxLength(), def.getScale(), isNull, position, 
sortOrder, def.getArraySize(), null, false, def.getExpression(), 
isRowTimestamp, false, isPK ? null : encodedColumnQualifier);
+                    def.getMaxLength(), def.getScale(), isNull, position, 
sortOrder, def.getArraySize(), null, false, def.getExpression(), 
isRowTimestamp, false, columnQualifierBytes);
             return column;
         } catch (IllegalArgumentException e) { // Based on precondition check 
in constructor
             throw new SQLException(e);
@@ -2068,10 +2068,10 @@ public class MetaDataClient {
                  * partitioned by the virtue of indexId present in the row 
key. As such, different shared indexes can use
                  * potentially overlapping column qualifiers.
                  * 
-                 * If the hbase table already exists, then possibly encoded or 
non-encoded column qualifiers already exist. 
-                 * In this case we pursue ahead with non-encoded column 
qualifier scheme. If the phoenix table already exists 
+                 * If the hbase table already exists, then possibly encoded or 
non-encoded column qualifiers were used. 
+                 * In this case we pursue ahead with non-encoded column 
qualifier scheme. If the phoenix metadata for this table already exists 
                  * then we rely on the PTable, with appropriate storage 
scheme, returned in the MetadataMutationResult to be updated 
-                 * in the client cache. If the phoenix table already doesn't 
exist then the non-encoded column qualifier scheme works
+                 * in the client cache. If the phoenix table metadata already 
doesn't exist then the non-encoded column qualifier scheme works
                  * because we cannot control the column qualifiers that were 
used when populating the hbase table.
                  * TODO: samarth add a test case for this
                  */
@@ -2136,8 +2136,9 @@ public class MetaDataClient {
                         cqCounterFamily = defaultFamilyName != null ? 
defaultFamilyName : DEFAULT_COLUMN_FAMILY;
                     }
                 }
-                Integer encodedCQ =  isPkColumn ? null : 
cqCounter.getNextQualifier(cqCounterFamily);   
-                PColumn column = newColumn(position++, colDef, pkConstraint, 
defaultFamilyName, false, encodedCQ);
+                Integer encodedCQ =  isPkColumn ? null : 
cqCounter.getNextQualifier(cqCounterFamily);
+                byte[] columnQualifierBytes = 
EncodedColumnsUtil.getColumnQualifierBytes(columnDefName.getColumnName(), 
encodedCQ, encodingScheme);
+                PColumn column = newColumn(position++, colDef, pkConstraint, 
defaultFamilyName, false, columnQualifierBytes);
                 if (cqCounter.increment(cqCounterFamily)) {
                     changedCqCounters.put(cqCounterFamily, 
cqCounter.getNextQualifier(cqCounterFamily));
                 }
@@ -2710,7 +2711,7 @@ public class MetaDataClient {
                                 PTable viewIndexTable = new PTableImpl(null,
                                         
SchemaUtil.getSchemaNameFromFullName(viewIndexPhysicalName),
                                         
SchemaUtil.getTableNameFromFullName(viewIndexPhysicalName), ts,
-                                        
table.getColumnFamilies(),table.isNamespaceMapped());
+                                        
table.getColumnFamilies(),table.isNamespaceMapped(), table.getStorageScheme(), 
table.getEncodingScheme());
                                 tableRefs.add(new TableRef(null, 
viewIndexTable, ts, false));
                             }
                         }
@@ -3181,7 +3182,8 @@ public class MetaDataClient {
                                         
cqCounterToUse.getNextQualifier(familyName));
                                 }
                             }
-                            PColumn column = newColumn(position++, colDef, 
PrimaryKeyConstraint.EMPTY, table.getDefaultFamilyName() == null ? null : 
table.getDefaultFamilyName().getString(), true, encodedCQ);
+                            byte[] columnQualifierBytes = 
EncodedColumnsUtil.getColumnQualifierBytes(colDef.getColumnDefName().getColumnName(),
 encodedCQ, table);
+                            PColumn column = newColumn(position++, colDef, 
PrimaryKeyConstraint.EMPTY, table.getDefaultFamilyName() == null ? null : 
table.getDefaultFamilyName().getString(), true, columnQualifierBytes);
                             columns.add(column);
                             String pkName = null;
                             Short keySeq = null;
@@ -3356,7 +3358,7 @@ public class MetaDataClient {
                             PTable viewIndexTable = new PTableImpl(null,
                                     
SchemaUtil.getSchemaNameFromFullName(viewIndexPhysicalName),
                                     
SchemaUtil.getTableNameFromFullName(viewIndexPhysicalName), ts,
-                                    table.getColumnFamilies(), 
table.isNamespaceMapped());
+                                    table.getColumnFamilies(), 
table.isNamespaceMapped(), table.getStorageScheme(), table.getEncodingScheme());
                             List<TableRef> tableRefs = 
Collections.singletonList(new TableRef(null, viewIndexTable, ts, false));
                             MutationPlan plan = new 
PostDDLCompiler(connection).compile(tableRefs, null, null,
                                     Collections.<PColumn> emptyList(), ts);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a41074a9/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java
index 978ded7..9e26227 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java
@@ -61,5 +61,5 @@ public interface PColumn extends PDatum {
     
     boolean isDynamic();
     
-    Integer getEncodedColumnQualifier();
+    byte[] getColumnQualifierBytes();
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a41074a9/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java
index c175aa2..d1a35cf 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java
@@ -17,11 +17,14 @@
  */
 package org.apache.phoenix.schema;
 
+import static 
org.apache.phoenix.util.EncodedColumnsUtil.usesEncodedColumnNames;
+
 import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
+import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.SizedUtil;
 
 import com.google.common.base.Preconditions;
@@ -34,7 +37,7 @@ public class PColumnFamilyImpl implements PColumnFamily {
     private final List<PColumn> columns;
     private final Map<String, PColumn> columnNamesByStrings;
     private final Map<byte[], PColumn> columnNamesByBytes;
-    private final Map<byte[], PColumn> encodedColumnQualifersByBytes;
+    private final Map<byte[], PColumn> columnsByQualifiers;
     private final int estimatedSize;
 
     @Override
@@ -42,7 +45,7 @@ public class PColumnFamilyImpl implements PColumnFamily {
         return estimatedSize;
     }
     
-    public PColumnFamilyImpl(PName name, List<PColumn> columns, boolean 
useEncodedColumnNames) {
+    public PColumnFamilyImpl(PName name, List<PColumn> columns) {
         Preconditions.checkNotNull(name);
         // Include guidePosts also in estimating the size
         long estimatedSize = SizedUtil.OBJECT_SIZE + SizedUtil.POINTER_SIZE * 
5 + SizedUtil.INT_SIZE + name.getEstimatedSize() +
@@ -51,18 +54,21 @@ public class PColumnFamilyImpl implements PColumnFamily {
         this.columns = ImmutableList.copyOf(columns);
         ImmutableMap.Builder<String, PColumn> columnNamesByStringBuilder = 
ImmutableMap.builder();
         ImmutableSortedMap.Builder<byte[], PColumn> columnNamesByBytesBuilder 
= ImmutableSortedMap.orderedBy(Bytes.BYTES_COMPARATOR);
-        ImmutableSortedMap.Builder<byte[], PColumn> 
encodedColumnQualifiersByBytesBuilder = 
ImmutableSortedMap.orderedBy(Bytes.BYTES_COMPARATOR);
+        ImmutableSortedMap.Builder<byte[], PColumn> columnsByQualifiersBuilder 
= ImmutableSortedMap.orderedBy(Bytes.BYTES_COMPARATOR);
         for (PColumn column : columns) {
             estimatedSize += column.getEstimatedSize();
             columnNamesByBytesBuilder.put(column.getName().getBytes(), column);
             columnNamesByStringBuilder.put(column.getName().getString(), 
column);
-            if (useEncodedColumnNames && column.getEncodedColumnQualifier() != 
null) {
-                
encodedColumnQualifiersByBytesBuilder.put(EncodedColumnsUtil.getEncodedColumnQualifier(column),
 column);
+            // In certain cases like JOIN, PK columns are assigned a column 
family. So they
+            // are not evaluated as a PK column. However, their column 
qualifier bytes are
+            // still null.
+            if (!SchemaUtil.isPKColumn(column) && 
column.getColumnQualifierBytes() != null) {
+                
columnsByQualifiersBuilder.put(column.getColumnQualifierBytes(), column);
             }
         }
         this.columnNamesByBytes = columnNamesByBytesBuilder.build();
         this.columnNamesByStrings = columnNamesByStringBuilder.build();
-        this.encodedColumnQualifersByBytes =  
encodedColumnQualifiersByBytesBuilder.build();
+        this.columnsByQualifiers =  columnsByQualifiersBuilder.build();
         this.estimatedSize = (int)estimatedSize;
     }
     
@@ -94,16 +100,21 @@ public class PColumnFamilyImpl implements PColumnFamily {
         return column;
     }
     
+    
+    //TODO: FIXME: samarth think about backward compatibility here since older 
tables won't have column qualifiers in their metadata
     @Override
     public PColumn getPColumnForColumnQualifier(byte[] cq) throws 
ColumnNotFoundException {
         Preconditions.checkNotNull(cq);
-        PColumn column = encodedColumnQualifersByBytes.get(cq);
-        if (column == null) {
-            // For tables with non-encoded column names, column qualifiers are
-            // column name bytes. Also dynamic columns don't have encoded 
column
-            // qualifiers. So they could be found in the column name by bytes 
map.
-            return getPColumnForColumnNameBytes(cq);
-        }
-        return column;
+        return columnsByQualifiers.get(cq);
+//        if (encodedColumnQualifiers) {
+//            return columnsByQualifiers.get(cq);
+//        }
+//        /*
+//         * For tables with non-encoded column names, column qualifiers are
+//         * column name bytes. Also dynamic columns don't have encoded column
+//         * qualifiers. So they could be found in the column name by bytes 
map.
+//         */
+//        return getPColumnForColumnNameBytes(cq);
+
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a41074a9/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
index dbb96d5..16054b4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
@@ -40,7 +40,7 @@ public class PColumnImpl implements PColumn {
     private String expressionStr;
     private boolean isRowTimestamp;
     private boolean isDynamic;
-    private Integer columnQualifier;
+    private byte[] columnQualifierBytes;
     
     public PColumnImpl() {
     }
@@ -52,13 +52,13 @@ public class PColumnImpl implements PColumn {
                        Integer scale,
                        boolean nullable,
                        int position,
-                       SortOrder sortOrder, Integer arrSize, byte[] 
viewConstant, boolean isViewReferenced, String expressionStr, boolean 
isRowTimestamp, boolean isDynamic, Integer columnQualifier) {
-        init(name, familyName, dataType, maxLength, scale, nullable, position, 
sortOrder, arrSize, viewConstant, isViewReferenced, expressionStr, 
isRowTimestamp, isDynamic, columnQualifier);
+                       SortOrder sortOrder, Integer arrSize, byte[] 
viewConstant, boolean isViewReferenced, String expressionStr, boolean 
isRowTimestamp, boolean isDynamic, byte[] columnQualifierBytes) {
+        init(name, familyName, dataType, maxLength, scale, nullable, position, 
sortOrder, arrSize, viewConstant, isViewReferenced, expressionStr, 
isRowTimestamp, isDynamic, columnQualifierBytes);
     }
 
     public PColumnImpl(PColumn column, int position) {
         this(column.getName(), column.getFamilyName(), column.getDataType(), 
column.getMaxLength(),
-                column.getScale(), column.isNullable(), position, 
column.getSortOrder(), column.getArraySize(), column.getViewConstant(), 
column.isViewReferenced(), column.getExpressionStr(), column.isRowTimestamp(), 
column.isDynamic(), column.getEncodedColumnQualifier());
+                column.getScale(), column.isNullable(), position, 
column.getSortOrder(), column.getArraySize(), column.getViewConstant(), 
column.isViewReferenced(), column.getExpressionStr(), column.isRowTimestamp(), 
column.isDynamic(), column.getColumnQualifierBytes());
     }
 
     private void init(PName name,
@@ -70,7 +70,7 @@ public class PColumnImpl implements PColumn {
             int position,
             SortOrder sortOrder,
             Integer arrSize,
-            byte[] viewConstant, boolean isViewReferenced, String 
expressionStr, boolean isRowTimestamp, boolean isDynamic, Integer 
columnQualifier) {
+            byte[] viewConstant, boolean isViewReferenced, String 
expressionStr, boolean isRowTimestamp, boolean isDynamic, byte[] 
columnQualifierBytes) {
        Preconditions.checkNotNull(sortOrder);
         this.dataType = dataType;
         if (familyName == null) {
@@ -95,7 +95,7 @@ public class PColumnImpl implements PColumn {
         this.expressionStr = expressionStr;
         this.isRowTimestamp = isRowTimestamp;
         this.isDynamic = isDynamic;
-        this.columnQualifier = columnQualifier;
+        this.columnQualifierBytes = columnQualifierBytes;
     }
 
     @Override
@@ -209,8 +209,8 @@ public class PColumnImpl implements PColumn {
     }
     
     @Override
-    public Integer getEncodedColumnQualifier() {
-        return columnQualifier;
+    public byte[] getColumnQualifierBytes() {
+        return columnQualifierBytes;
     }
 
     /**
@@ -258,12 +258,12 @@ public class PColumnImpl implements PColumn {
         if (column.hasIsDynamic()) {
                isDynamic = column.getIsDynamic();
         }
-        Integer columnQualifier = null;
-        if (column.hasEncodedColumnQualifier()) {
-            columnQualifier = column.getEncodedColumnQualifier();
+        byte[] columnQualifierBytes = null;
+        if (column.hasColumnQualifierBytes()) {
+            columnQualifierBytes = 
column.getColumnQualifierBytes().toByteArray();
         }
         return new PColumnImpl(columnName, familyName, dataType, maxLength, 
scale, nullable, position, sortOrder,
-                arraySize, viewConstant, isViewReferenced, expressionStr, 
isRowTimestamp, isDynamic, columnQualifier);
+                arraySize, viewConstant, isViewReferenced, expressionStr, 
isRowTimestamp, isDynamic, columnQualifierBytes);
     }
 
     public static PTableProtos.PColumn toProto(PColumn column) {
@@ -294,8 +294,8 @@ public class PColumnImpl implements PColumn {
             builder.setExpression(column.getExpressionStr());
         }
         builder.setIsRowTimestamp(column.isRowTimestamp());
-        if (column.getEncodedColumnQualifier() != null) {
-            
builder.setEncodedColumnQualifier(column.getEncodedColumnQualifier());
+        if (column.getColumnQualifierBytes() != null) {
+            
builder.setColumnQualifierBytes(ByteStringer.wrap(column.getColumnQualifierBytes()));
         }
         return builder.build();
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a41074a9/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
index 0263b09..169e78d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
@@ -214,7 +214,7 @@ public class PMetaDataImpl implements PMetaData {
             // Update position of columns that follow removed column
             for (int i = position+1; i < oldColumns.size(); i++) {
                 PColumn oldColumn = oldColumns.get(i);
-                PColumn newColumn = new PColumnImpl(oldColumn.getName(), 
oldColumn.getFamilyName(), oldColumn.getDataType(), oldColumn.getMaxLength(), 
oldColumn.getScale(), oldColumn.isNullable(), i-1+positionOffset, 
oldColumn.getSortOrder(), oldColumn.getArraySize(), 
oldColumn.getViewConstant(), oldColumn.isViewReferenced(), 
oldColumn.getExpressionStr(), oldColumn.isRowTimestamp(), 
oldColumn.isDynamic(), oldColumn.getEncodedColumnQualifier());
+                PColumn newColumn = new PColumnImpl(oldColumn.getName(), 
oldColumn.getFamilyName(), oldColumn.getDataType(), oldColumn.getMaxLength(), 
oldColumn.getScale(), oldColumn.isNullable(), i-1+positionOffset, 
oldColumn.getSortOrder(), oldColumn.getArraySize(), 
oldColumn.getViewConstant(), oldColumn.isViewReferenced(), 
oldColumn.getExpressionStr(), oldColumn.isRowTimestamp(), 
oldColumn.isDynamic(), oldColumn.getColumnQualifierBytes());
                 columns.add(newColumn);
             }
             

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a41074a9/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index 184a588..4c85e48 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -190,194 +190,167 @@ public interface PTable extends PMetaDataEntity {
         }
     }
     
-    public static class QualifierEncodingScheme<E> implements 
QualifierEncoderDecoder<E> {
-        
-        public static final QualifierEncodingScheme NON_ENCODED_QUALIFIERS = 
new QualifierEncodingScheme<String>((byte)0, "NON_ENCODED_QUALIFIERS", null) {
+    public enum QualifierEncodingScheme implements QualifierEncoderDecoder {
+        NON_ENCODED_QUALIFIERS((byte)0, null) {
             @Override
-            public byte[] getEncodedBytes(String value) {
-                return Bytes.toBytes(value);
+            public byte[] getEncodedBytes(int value) {
+                throw new UnsupportedOperationException();
             }
 
             @Override
-            public String getDecodedValue(byte[] bytes) {
-                return Bytes.toString(bytes);
+            public int getDecodedValue(byte[] bytes) {
+                throw new UnsupportedOperationException();
             }
 
             @Override
-            public String getDecodedValue(byte[] bytes, int offset, int 
length) {
-                return Bytes.toString(bytes, offset, length);
+            public int getDecodedValue(byte[] bytes, int offset, int length) {
+                throw new UnsupportedOperationException();
             }
             
             @Override
-            public boolean isEncodeable(String value) {
+            public boolean isEncodeable() {
                 return true;
             }
             
             @Override
             public String toString() {
-                return "NON_ENCODED_QUALIFIERS";
+                return name();
             }
-        };
-        public static final QualifierEncodingScheme ONE_BYTE_QUALIFIERS = new 
QualifierEncodingScheme<Long>((byte)1, "ONE_BYTE_QUALIFIERS", 255l) {
+        },
+        ONE_BYTE_QUALIFIERS((byte)1, 255) {
             @Override
-            public byte[] getEncodedBytes(Long value) {
+            public byte[] getEncodedBytes(int value) {
                 throw new UnsupportedOperationException();
             }
 
             @Override
-            public Long getDecodedValue(byte[] bytes) {
+            public int getDecodedValue(byte[] bytes) {
                 throw new UnsupportedOperationException();
             }
 
             @Override
-            public Long getDecodedValue(byte[] bytes, int offset, int length) {
+            public int getDecodedValue(byte[] bytes, int offset, int length) {
                 throw new UnsupportedOperationException();
             }
             
             @Override
-            public boolean isEncodeable(Long value) {
+            public boolean isEncodeable() {
                 return true;
             }
             
             @Override
             public String toString() {
-                return "ONE_BYTE_QUALIFIERS";
+                return name();
             }
-        };
-        public static final QualifierEncodingScheme TWO_BYTE_QUALIFIERS = new 
QualifierEncodingScheme<Long>((byte)2, "TWO_BYTE_QUALIFIERS", 65535l) {
+        },
+        TWO_BYTE_QUALIFIERS((byte)2, 65535) {
             @Override
-            public byte[] getEncodedBytes(Long value) {
+            public byte[] getEncodedBytes(int value) {
                 throw new UnsupportedOperationException();
             }
 
             @Override
-            public Long getDecodedValue(byte[] bytes) {
+            public int getDecodedValue(byte[] bytes) {
                 throw new UnsupportedOperationException();
             }
 
             @Override
-            public Long getDecodedValue(byte[] bytes, int offset, int length) {
+            public int getDecodedValue(byte[] bytes, int offset, int length) {
                 throw new UnsupportedOperationException();
             }
             
             @Override
-            public boolean isEncodeable(Long value) {
+            public boolean isEncodeable() {
                 return true;
             }
             
             @Override
             public String toString() {
-                return "TWO_BYTE_QUALIFIERS";
+                return name();
             }
-        };
-        public static final QualifierEncodingScheme THREE_BYTE_QUALIFIERS = 
new QualifierEncodingScheme<Long>((byte)3, "THREE_BYTE_QUALIFIERS", 16777215l) {
+        },
+        THREE_BYTE_QUALIFIERS((byte)3, 16777215) {
             @Override
-            public byte[] getEncodedBytes(Long value) {
+            public byte[] getEncodedBytes(int value) {
                 throw new UnsupportedOperationException();
             }
 
             @Override
-            public Long getDecodedValue(byte[] bytes) {
+            public int getDecodedValue(byte[] bytes) {
                 throw new UnsupportedOperationException();
             }
 
             @Override
-            public Long getDecodedValue(byte[] bytes, int offset, int length) {
+            public int getDecodedValue(byte[] bytes, int offset, int length) {
                 throw new UnsupportedOperationException();
             }
             
             @Override
-            public boolean isEncodeable(Long value) {
+            public boolean isEncodeable() {
                 return true;
             }
             
             @Override
             public String toString() {
-                return "THREE_BYTE_QUALIFIERS";
+                return name();
             }
-        };
-        public static final QualifierEncodingScheme FOUR_BYTE_QUALIFIERS = new 
QualifierEncodingScheme<Long>((byte)4, "FOUR_BYTE_QUALIFIERS", 4294967295l) {
+        },
+        FOUR_BYTE_QUALIFIERS((byte)4, Integer.MAX_VALUE) {
             @Override
-            public byte[] getEncodedBytes(Long value) {
+            public byte[] getEncodedBytes(int value) {
                 return Bytes.toBytes(value);
             }
 
             @Override
-            public Long getDecodedValue(byte[] bytes) {
-                return Bytes.toLong(bytes);
+            public int getDecodedValue(byte[] bytes) {
+                return Bytes.toInt(bytes);
             }
 
             @Override
-            public Long getDecodedValue(byte[] bytes, int offset, int length) {
-                return Bytes.toLong(bytes, offset, length);
+            public int getDecodedValue(byte[] bytes, int offset, int length) {
+                return Bytes.toInt(bytes, offset, length);
             }
             
             @Override
-            public boolean isEncodeable(Long value) {
+            public boolean isEncodeable() {
                 return true;
             }
             
             @Override
             public String toString() {
-                return "FOUR_BYTE_QUALIFIERS";
+                return name();
             }
         };
-        public static final QualifierEncodingScheme[] schemes = 
{NON_ENCODED_QUALIFIERS, ONE_BYTE_QUALIFIERS, TWO_BYTE_QUALIFIERS, 
THREE_BYTE_QUALIFIERS, FOUR_BYTE_QUALIFIERS}; 
-        private final byte[] metadataBytes;
+        
         private final byte metadataValue;
-        private final Long maxQualifier;
-
-        private QualifierEncodingScheme(byte serializedMetadataValue, String 
name, Long maxQualifier) {
-            this.metadataValue = serializedMetadataValue;
-            this.metadataBytes = Bytes.toBytes(name);
-            this.maxQualifier = maxQualifier;
-        }
-
-        public byte[] getMetadataBytes() {
-            return metadataBytes;
-        }
-
+        private final Integer maxQualifier;
+        
         public byte getSerializedMetadataValue() {
             return this.metadataValue;
         }
 
         public static QualifierEncodingScheme fromSerializedValue(byte 
serializedValue) {
-            if (serializedValue < 0 || serializedValue >= schemes.length) {
+            if (serializedValue < 0 || serializedValue >= 
QualifierEncodingScheme.values().length) {
                 return null;
             }
-            return schemes[serializedValue];
+            return QualifierEncodingScheme.values()[serializedValue];
         }
         
-        public Long getMaxQualifier() {
+        public Integer getMaxQualifier() {
             return maxQualifier;
         }
 
-        @Override
-        public byte[] getEncodedBytes(E value) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public E getDecodedValue(byte[] bytes) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public E getDecodedValue(byte[] bytes, int offset, int length) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public boolean isEncodeable(E value) {
-            throw new UnsupportedOperationException();
+        private QualifierEncodingScheme(byte serializedMetadataValue, Integer 
maxQualifier) {
+            this.metadataValue = serializedMetadataValue;
+            this.maxQualifier = maxQualifier;
         }
     }
     
-    interface QualifierEncoderDecoder<E> {
-        byte[] getEncodedBytes(E value);
-        E getDecodedValue(byte[] bytes);
-        E getDecodedValue(byte[] bytes, int offset, int length);
-        boolean isEncodeable(E value);
+    interface QualifierEncoderDecoder {
+        byte[] getEncodedBytes(int value);
+        int getDecodedValue(byte[] bytes);
+        int getDecodedValue(byte[] bytes, int offset, int length);
+        boolean isEncodeable();
     }
 
     long getTimeStamp();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a41074a9/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 7e6f35b..7f11faf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -20,7 +20,7 @@ package org.apache.phoenix.schema;
 import static org.apache.phoenix.hbase.index.util.KeyValueBuilder.addQuietly;
 import static 
org.apache.phoenix.hbase.index.util.KeyValueBuilder.deleteQuietly;
 import static org.apache.phoenix.schema.SaltingUtil.SALTING_COLUMN;
-import static 
org.apache.phoenix.util.EncodedColumnsUtil.getEncodedColumnQualifier;
+import static 
org.apache.phoenix.util.EncodedColumnsUtil.usesEncodedColumnNames;
 
 import java.io.IOException;
 import java.sql.DriverManager;
@@ -34,6 +34,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import javax.annotation.Nonnull;
+
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Delete;
@@ -119,7 +121,7 @@ public class PTableImpl implements PTable {
     private Map<byte[], PColumnFamily> familyByBytes;
     private Map<String, PColumnFamily> familyByString;
     private ListMultimap<String, PColumn> columnsByName;
-    private ListMultimap<Pair<String, Integer>, PColumn> 
kvColumnsByEncodedColumnNames;
+    private ListMultimap<KVColumnFamilyQualifier, PColumn> 
kvColumnsByEncodedQualifiers;
     private PName pkName;
     private Integer bucketNum;
     private RowKeySchema rowKeySchema;
@@ -160,8 +162,33 @@ public class PTableImpl implements PTable {
         this.physicalNames = Collections.emptyList();
         this.rowKeySchema = RowKeySchema.EMPTY_SCHEMA;
     }
-
-    public PTableImpl(PName tenantId, String schemaName, String tableName, 
long timestamp, List<PColumnFamily> families, boolean isNamespaceMapped) { // 
For base table of mapped VIEW
+    
+    // Constructor used at table creation time
+    public PTableImpl(PName tenantId, String schemaName, String tableName, 
long timestamp, List<PColumnFamily> families, boolean isNamespaceMapped) {
+        Preconditions.checkArgument(tenantId==null || 
tenantId.getBytes().length > 0); // tenantId should be null or not empty
+        this.tenantId = tenantId;
+        this.name = PNameFactory.newName(SchemaUtil.getTableName(schemaName, 
tableName));
+        this.key = new PTableKey(tenantId, name.getString());
+        this.schemaName = PNameFactory.newName(schemaName);
+        this.tableName = PNameFactory.newName(tableName);
+        this.type = PTableType.VIEW;
+        this.viewType = ViewType.MAPPED;
+        this.timeStamp = timestamp;
+        this.pkColumns = this.allColumns = Collections.emptyList();
+        this.rowKeySchema = RowKeySchema.EMPTY_SCHEMA;
+        this.indexes = Collections.emptyList();
+        this.familyByBytes = Maps.newHashMapWithExpectedSize(families.size());
+        this.familyByString = Maps.newHashMapWithExpectedSize(families.size());
+        for (PColumnFamily family : families) {
+            familyByBytes.put(family.getName().getBytes(), family);
+            familyByString.put(family.getName().getString(), family);
+        }
+        this.families = families;
+        this.physicalNames = Collections.emptyList();
+        this.isNamespaceMapped = isNamespaceMapped;
+    }
+    
+    public PTableImpl(PName tenantId, String schemaName, String tableName, 
long timestamp, List<PColumnFamily> families, boolean isNamespaceMapped, 
StorageScheme storageScheme, QualifierEncodingScheme encodingScheme) { // For 
base table of mapped VIEW
         Preconditions.checkArgument(tenantId==null || 
tenantId.getBytes().length > 0); // tenantId should be null or not empty
         this.tenantId = tenantId;
         this.name = PNameFactory.newName(SchemaUtil.getTableName(schemaName, 
tableName));
@@ -183,6 +210,8 @@ public class PTableImpl implements PTable {
         this.families = families;
         this.physicalNames = Collections.emptyList();
         this.isNamespaceMapped = isNamespaceMapped;
+        this.storageScheme = storageScheme;
+        this.qualifierEncodingScheme = encodingScheme;
     }
     
     // For indexes stored in shared physical tables
@@ -452,7 +481,7 @@ public class PTableImpl implements PTable {
         PColumn[] allColumns;
         
         this.columnsByName = ArrayListMultimap.create(columns.size(), 1);
-        this.kvColumnsByEncodedColumnNames = 
(EncodedColumnsUtil.usesEncodedColumnNames(qualifierEncodingScheme) ? 
ArrayListMultimap.<Pair<String, Integer>, PColumn>create(columns.size(), 1) : 
null);
+        this.kvColumnsByEncodedQualifiers = 
ArrayListMultimap.<KVColumnFamilyQualifier, PColumn>create(columns.size(), 1);
         int numPKColumns = 0;
         if (bucketNum != null) {
             // Add salt column to allColumns and pkColumns, but don't add to
@@ -465,6 +494,7 @@ public class PTableImpl implements PTable {
             allColumns = new PColumn[columns.size()];
             pkColumns = Lists.newArrayListWithExpectedSize(columns.size());
         }
+        boolean encodedColumnQualifiers = 
usesEncodedColumnNames(qualifierEncodingScheme);
         for (PColumn column : columns) {
             allColumns[column.getPosition()] = column;
             PName familyName = column.getFamilyName();
@@ -483,20 +513,18 @@ public class PTableImpl implements PTable {
                     }
                 }
             }
-            //TODO: samarth understand the implication of this.
-            if (kvColumnsByEncodedColumnNames != null) {
-                Integer cq = column.getEncodedColumnQualifier();
-                String cf = column.getFamilyName() != null ? 
column.getFamilyName().getString() : null;
-                if (cf != null && cq != null) {
-                    Pair<String, Integer> pair = new Pair<>(cf, cq);
-                    if (kvColumnsByEncodedColumnNames.put(pair, column)) {
-                        int count = 0;
-                        for (PColumn dupColumn : 
kvColumnsByEncodedColumnNames.get(pair)) {
-                            if (Objects.equal(familyName, 
dupColumn.getFamilyName())) {
-                                count++;
-                                if (count > 1) {
-                                    throw new 
ColumnAlreadyExistsException(schemaName.getString(), name.getString(), 
columnName);
-                                }
+            byte[] cq = column.getColumnQualifierBytes();
+            String cf = column.getFamilyName() != null ? 
column.getFamilyName().getString() : null;
+            if (cf != null && cq != null && encodedColumnQualifiers) {
+                KVColumnFamilyQualifier info = new KVColumnFamilyQualifier(cf, 
cq);
+                if (kvColumnsByEncodedQualifiers.put(info, column)) {
+                    int count = 0;
+                    for (PColumn dupColumn : 
kvColumnsByEncodedQualifiers.get(info)) {
+                        if (Objects.equal(familyName, 
dupColumn.getFamilyName())) {
+                            count++;
+                            if (count > 1) {
+                                throw new 
ColumnAlreadyExistsException(schemaName.getString(),
+                                        name.getString(), columnName);
                             }
                         }
                     }
@@ -561,7 +589,7 @@ public class PTableImpl implements PTable {
                 .orderedBy(Bytes.BYTES_COMPARATOR);
         for (int i = 0; i < families.length; i++) {
             Map.Entry<PName,List<PColumn>> entry = iterator.next();
-            PColumnFamily family = new PColumnFamilyImpl(entry.getKey(), 
entry.getValue(), 
EncodedColumnsUtil.usesEncodedColumnNames(qualifierEncodingScheme));
+            PColumnFamily family = new PColumnFamilyImpl(entry.getKey(), 
entry.getValue());//, qualifierEncodingScheme);
             families[i] = family;
             familyByString.put(family.getName().getString(), family);
             familyByBytes.put(family.getName().getBytes(), family);
@@ -806,9 +834,8 @@ public class PTableImpl implements PTable {
             String columnName = (String)PVarchar.INSTANCE.toObject(cq);
             return getPColumnForColumnName(columnName);
         } else {
-            Integer qualifier = getEncodedColumnQualifier(cq);
             String family = (String)PVarchar.INSTANCE.toObject(cf);
-            List<PColumn> columns = kvColumnsByEncodedColumnNames.get(new 
Pair<>(family, qualifier));
+            List<PColumn> columns = kvColumnsByEncodedQualifiers.get(new 
KVColumnFamilyQualifier(family, cq));
             int size = columns.size();
             if (size == 0) {
                 //TODO: samarth should we have a column qualifier not found 
exception?
@@ -898,7 +925,8 @@ public class PTableImpl implements PTable {
                         Collection<PColumn> columns = family.getColumns();
                         int maxEncodedColumnQualifier = Integer.MIN_VALUE;
                         for (PColumn column : columns) {
-                            maxEncodedColumnQualifier = 
Math.max(maxEncodedColumnQualifier, column.getEncodedColumnQualifier());
+                            int qualifier = 
qualifierEncodingScheme.getDecodedValue(column.getColumnQualifierBytes());
+                            maxEncodedColumnQualifier = 
Math.max(maxEncodedColumnQualifier, qualifier);
                         }
                         Expression[] colValues = new 
Expression[maxEncodedColumnQualifier+1];
                         Arrays.fill(colValues, new 
DelegateExpression(LiteralExpression.newConstant(null)) {
@@ -911,7 +939,8 @@ public class PTableImpl implements PTable {
                         
colValues[0]=LiteralExpression.newConstant(QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
                         for (PColumn column : columns) {
                                if (columnToValueMap.containsKey(column)) {
-                                       
colValues[column.getEncodedColumnQualifier()] = new 
LiteralExpression(columnToValueMap.get(column));
+                                   int qualifier = 
qualifierEncodingScheme.getDecodedValue(column.getColumnQualifierBytes());
+                                       colValues[qualifier] = new 
LiteralExpression(columnToValueMap.get(column));
                                }
                         }
                         
@@ -963,7 +992,7 @@ public class PTableImpl implements PTable {
         public void setValue(PColumn column, byte[] byteValue) {
             deleteRow = null;
             byte[] family = column.getFamilyName().getBytes();
-            byte[] qualifier = getColumnQualifier(column);
+            byte[] qualifier = column.getColumnQualifierBytes();
             ImmutableBytesPtr qualifierPtr = new ImmutableBytesPtr(qualifier);
             PDataType<?> type = column.getDataType();
             // Check null, since some types have no byte representation for 
null
@@ -1041,10 +1070,6 @@ public class PTableImpl implements PTable {
             }
         }
         
-        private byte[] getColumnQualifier(PColumn column) {
-            return EncodedColumnsUtil.getColumnQualifier(column, 
PTableImpl.this);
-        }
-        
     }
 
     @Override
@@ -1510,4 +1535,39 @@ public class PTableImpl implements PTable {
     public QualifierEncodingScheme getEncodingScheme() {
         return qualifierEncodingScheme;
     }
+    
+    private static final class KVColumnFamilyQualifier {
+        @Nonnull
+        private final String colFamilyName;
+        @Nonnull
+        private final byte[] colQualifier;
+
+        public KVColumnFamilyQualifier(String colFamilyName, byte[] 
colQualifier) {
+            Preconditions.checkArgument(colFamilyName != null && colQualifier 
!= null,
+                "None of the arguments, column family name or column qualifier 
can be null");
+            this.colFamilyName = colFamilyName;
+            this.colQualifier = colQualifier;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + colFamilyName.hashCode();
+            result = prime * result + Arrays.hashCode(colQualifier);
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) return true;
+            if (obj == null) return false;
+            if (getClass() != obj.getClass()) return false;
+            KVColumnFamilyQualifier other = (KVColumnFamilyQualifier) obj;
+            if (!colFamilyName.equals(other.colFamilyName)) return false;
+            if (!Arrays.equals(colQualifier, other.colQualifier)) return false;
+            return true;
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a41074a9/phoenix-core/src/main/java/org/apache/phoenix/schema/ProjectedColumn.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/ProjectedColumn.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/ProjectedColumn.java
index 9336938..d875982 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/ProjectedColumn.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ProjectedColumn.java
@@ -24,14 +24,16 @@ public class ProjectedColumn extends DelegateColumn {
     private final int position;
     private final boolean nullable;
     private final ColumnRef sourceColumnRef;
+    private final byte[] cq;
 
-    public ProjectedColumn(PName name, PName familyName, int position, boolean 
nullable, ColumnRef sourceColumnRef) {
+    public ProjectedColumn(PName name, PName familyName, int position, boolean 
nullable, ColumnRef sourceColumnRef, byte[] cq) {
         super(sourceColumnRef.getColumn());
         this.name = name;
         this.familyName = familyName;
         this.position = position;
         this.nullable = nullable;
         this.sourceColumnRef = sourceColumnRef;
+        this.cq = cq;
     }
     
     @Override
@@ -53,7 +55,12 @@ public class ProjectedColumn extends DelegateColumn {
     public boolean isNullable() {
         return nullable;
     }
-
+    
+    @Override
+    public byte[] getColumnQualifierBytes() {
+        return cq;
+    }
+    
     public ColumnRef getSourceColumnRef() {
         return sourceColumnRef;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a41074a9/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
index f39bb1f..dcafd84 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
@@ -20,7 +20,6 @@ package org.apache.phoenix.schema.tuple;
 import static com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.phoenix.query.QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE;
 import static 
org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_NAME;
-import static 
org.apache.phoenix.util.EncodedColumnsUtil.getEncodedColumnQualifier;
 
 import java.util.Collection;
 import java.util.ConcurrentModificationException;
@@ -32,6 +31,7 @@ import java.util.NoSuchElementException;
 import javax.annotation.concurrent.NotThreadSafe;
 
 import org.apache.hadoop.hbase.Cell;
+import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.PTable.StorageScheme;
 
 /**
@@ -64,8 +64,9 @@ public class EncodedColumnQualiferCellsList implements 
List<Cell> {
     private static final int RESERVED_RANGE_SIZE = 
ENCODED_CQ_COUNTER_INITIAL_VALUE - ENCODED_EMPTY_COLUMN_NAME;
     // Used by iterators to figure out if the list was structurally modified.
     private int modCount = 0;
+    private final QualifierEncodingScheme encodingScheme;
 
-    public EncodedColumnQualiferCellsList(int minQ, int maxQ) {
+    public EncodedColumnQualiferCellsList(int minQ, int maxQ, 
QualifierEncodingScheme encodingScheme) {
         checkArgument(minQ <= maxQ, "Invalid arguments. Min: " + minQ
                 + ". Max: " + maxQ);
         this.minQualifier = minQ;
@@ -80,6 +81,7 @@ public class EncodedColumnQualiferCellsList implements 
List<Cell> {
         }
         this.array = new Cell[size];
         this.nonReservedRangeOffset = minQ > ENCODED_CQ_COUNTER_INITIAL_VALUE 
? minQ  - ENCODED_CQ_COUNTER_INITIAL_VALUE : 0;
+        this.encodingScheme = encodingScheme;
     }
 
     @Override
@@ -131,7 +133,7 @@ public class EncodedColumnQualiferCellsList implements 
List<Cell> {
         if (e == null) {
             throw new NullPointerException();
         }
-        int columnQualifier = getEncodedColumnQualifier(e.getQualifierArray(), 
e.getQualifierOffset(), e.getQualifierLength());
+        int columnQualifier = 
encodingScheme.getDecodedValue(e.getQualifierArray(), e.getQualifierOffset(), 
e.getQualifierLength());
                 
         checkQualifierRange(columnQualifier);
         int idx = getArrayIndex(columnQualifier);
@@ -231,7 +233,7 @@ public class EncodedColumnQualiferCellsList implements 
List<Cell> {
             ListIterator<Cell> listItr = this.listIterator();
             while (listItr.hasNext()) {
                 Cell cellInThis = listItr.next();
-                int qualifier = 
getEncodedColumnQualifier(cellInThis.getQualifierArray(),
+                int qualifier = 
encodingScheme.getDecodedValue(cellInThis.getQualifierArray(),
                             cellInThis.getQualifierOffset(), 
cellInThis.getQualifierLength());
                 try {
                     Cell cellInParam = 
list.getCellForColumnQualifier(qualifier);
@@ -354,7 +356,12 @@ public class EncodedColumnQualiferCellsList implements 
List<Cell> {
         return new Itr();
     }
 
-    public Cell getCellForColumnQualifier(int columnQualifier) {
+    public Cell getCellForColumnQualifier(byte[] qualifierBytes) {
+        int columnQualifier = encodingScheme.getDecodedValue(qualifierBytes);
+        return getCellForColumnQualifier(columnQualifier);
+    }
+    
+    private Cell getCellForColumnQualifier(int columnQualifier) {
         checkQualifierRange(columnQualifier);
         int idx = getArrayIndex(columnQualifier);
         Cell c = array[idx];
@@ -510,7 +517,7 @@ public class EncodedColumnQualiferCellsList implements 
List<Cell> {
             if (lastRet == -1) {
                 throw new IllegalStateException();
             }
-            int columnQualifier = 
getEncodedColumnQualifier(e.getQualifierArray(), e.getQualifierOffset(), 
e.getQualifierLength());                    
+            int columnQualifier = 
encodingScheme.getDecodedValue(e.getQualifierArray(), e.getQualifierOffset(), 
e.getQualifierLength());                    
             int idx = getArrayIndex(columnQualifier);
             if (idx != lastRet) {
                 throw new IllegalArgumentException("Cell " + e + " with column 
qualifier "

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a41074a9/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedMultiKeyValueTuple.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedMultiKeyValueTuple.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedMultiKeyValueTuple.java
index 08cafe0..01a5e4d 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedMultiKeyValueTuple.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedMultiKeyValueTuple.java
@@ -18,7 +18,6 @@
 package org.apache.phoenix.schema.tuple;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static 
org.apache.phoenix.util.EncodedColumnsUtil.getEncodedColumnQualifier;
 
 import java.util.List;
 
@@ -58,7 +57,7 @@ public class PositionBasedMultiKeyValueTuple extends 
BaseTuple {
 
     @Override
     public Cell getValue(byte[] family, byte[] qualifier) {
-        return 
values.getCellForColumnQualifier(getEncodedColumnQualifier(qualifier));
+        return values.getCellForColumnQualifier(qualifier);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a41074a9/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedResultTuple.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedResultTuple.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedResultTuple.java
index 109cfc3..63ba101 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedResultTuple.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedResultTuple.java
@@ -49,8 +49,7 @@ public class PositionBasedResultTuple extends BaseTuple {
 
     @Override
     public KeyValue getValue(byte[] family, byte[] qualifier) {
-        int columnQualifier = 
EncodedColumnsUtil.getEncodedColumnQualifier(qualifier);
-        return 
org.apache.hadoop.hbase.KeyValueUtil.ensureKeyValue(cells.getCellForColumnQualifier(columnQualifier));
+        return 
org.apache.hadoop.hbase.KeyValueUtil.ensureKeyValue(cells.getCellForColumnQualifier(qualifier));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a41074a9/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java
index a9addf0..26ffe82 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java
@@ -17,7 +17,7 @@
  */
 package org.apache.phoenix.util;
 
-import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 import static 
org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
 
 import java.util.Map;
@@ -44,32 +44,6 @@ public class EncodedColumnsUtil {
     public static boolean usesEncodedColumnNames(QualifierEncodingScheme 
encodingScheme) {
         return encodingScheme != null && encodingScheme != 
QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
     }
-
-    public static byte[] getEncodedColumnQualifier(PColumn column) {
-        checkArgument(!SchemaUtil.isPKColumn(column), "No column qualifiers 
for PK columns");
-        checkArgument(!column.isDynamic(), "No encoded column qualifiers for 
dynamic columns");
-        //TODO: samarth this would need to use encoding scheme.
-        return Bytes.toBytes(column.getEncodedColumnQualifier());
-    }
-    
-    public static int getEncodedColumnQualifier(byte[] bytes, int offset, int 
length) {
-      //TODO: samarth this would need to use encoding scheme.
-        return Bytes.toInt(bytes, offset, length);
-    }
-    
-    public static byte[] getEncodedColumnQualifier(int value) {
-      //TODO: samarth this would need to use encoding scheme.
-        return Bytes.toBytes(value);
-    }
-    
-    public static int getEncodedColumnQualifier(byte[] bytes) {
-      //TODO: samarth this would need to use encoding scheme.
-        return Bytes.toInt(bytes);
-    }
-
-    public static byte[] getColumnQualifier(PColumn column, PTable table) {
-      return EncodedColumnsUtil.getColumnQualifier(column, 
usesEncodedColumnNames(table));
-    }
     
     public static void setColumns(PColumn column, PTable table, Scan scan) {
        if (table.getStorageScheme() == 
StorageScheme.ONE_CELL_PER_COLUMN_FAMILY) {
@@ -79,16 +53,14 @@ public class EncodedColumnsUtil {
                scan.addFamily(column.getFamilyName().getBytes());
        }
         else {
-               scan.addColumn(column.getFamilyName().getBytes(), 
EncodedColumnsUtil.getColumnQualifier(column, table));
+            if (column.getColumnQualifierBytes() != null) {
+                scan.addColumn(column.getFamilyName().getBytes(), 
column.getColumnQualifierBytes());
+            }
         }
     }
     
-    public static byte[] getColumnQualifier(PColumn column, boolean 
encodedColumnName) {
-        checkArgument(!SchemaUtil.isPKColumn(column), "No column qualifiers 
for PK columns");
-        if (column.isDynamic()) { // Dynamic column names don't have encoded 
column names
-            return column.getName().getBytes();
-        }
-        return encodedColumnName ? getEncodedColumnQualifier(column) : 
column.getName().getBytes(); 
+    public static QualifierEncodingScheme getEncodingScheme(Scan s) {
+        return 
QualifierEncodingScheme.fromSerializedValue(s.getAttribute(BaseScannerRegionObserver.QUALIFIER_ENCODING_SCHEME)[0]);
     }
 
     /**
@@ -110,20 +82,26 @@ public class EncodedColumnsUtil {
                 QueryConstants.ENCODED_EMPTY_COLUMN_VALUE_BYTES) : new 
Pair<>(QueryConstants.EMPTY_COLUMN_BYTES,
                 QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
     }
-
-    public static boolean hasEncodedColumnName(PColumn column){
-        return !SchemaUtil.isPKColumn(column) && !column.isDynamic() && 
column.getEncodedColumnQualifier() != null;
+    
+    /**
+     * @return pair of byte arrays. The first part of the pair is the empty 
key value's column qualifier, and the second
+     *         part is the value to use for it.
+     */
+    public static Pair<byte[], byte[]> 
getEmptyKeyValueInfo(QualifierEncodingScheme encodingScheme) {
+        return usesEncodedColumnNames(encodingScheme) ? new 
Pair<>(QueryConstants.ENCODED_EMPTY_COLUMN_BYTES,
+                QueryConstants.ENCODED_EMPTY_COLUMN_VALUE_BYTES) : new 
Pair<>(QueryConstants.EMPTY_COLUMN_BYTES,
+                QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
     }
 
     public static Pair<Integer, Integer> getMinMaxQualifiersFromScan(Scan 
scan) {
         Integer minQ = null, maxQ = null;
         byte[] minQualifier = 
scan.getAttribute(BaseScannerRegionObserver.MIN_QUALIFIER);
         if (minQualifier != null) {
-            minQ = getEncodedColumnQualifier(minQualifier);
+            minQ = Bytes.toInt(minQualifier);
         }
         byte[] maxQualifier = 
scan.getAttribute(BaseScannerRegionObserver.MAX_QUALIFIER);
         if (maxQualifier != null) {
-            maxQ = getEncodedColumnQualifier(maxQualifier);
+            maxQ = Bytes.toInt(maxQualifier);
         }
         if (minQualifier == null) {
             return null;
@@ -142,8 +120,10 @@ public class EncodedColumnsUtil {
         return minMaxQualifiers != null;
     }
 
-    public static Map<String, Pair<Integer, Integer>> 
getQualifierRanges(PTable table) {
-        Preconditions.checkArgument(table.getEncodingScheme() != 
NON_ENCODED_QUALIFIERS,
+    public static Map<String, Pair<Integer, Integer>> 
getFamilyQualifierRanges(PTable table) {
+        checkNotNull(table);
+        QualifierEncodingScheme encodingScheme = table.getEncodingScheme();
+        Preconditions.checkArgument(encodingScheme != NON_ENCODED_QUALIFIERS,
             "Use this method only for tables with encoding scheme "
                     + NON_ENCODED_QUALIFIERS);
         Map<String, Pair<Integer, Integer>> toReturn = 
Maps.newHashMapWithExpectedSize(table.getColumns().size());
@@ -151,7 +131,7 @@ public class EncodedColumnsUtil {
             if (!SchemaUtil.isPKColumn(column)) {
                 String colFamily = column.getFamilyName().getString();
                 Pair<Integer, Integer> minMaxQualifiers = 
toReturn.get(colFamily);
-                Integer encodedColumnQualifier = 
column.getEncodedColumnQualifier();
+                Integer encodedColumnQualifier = 
encodingScheme.getDecodedValue(column.getColumnQualifierBytes());
                 if (minMaxQualifiers == null) {
                     minMaxQualifiers = new Pair<>(encodedColumnQualifier, 
encodedColumnQualifier);
                     toReturn.put(colFamily, minMaxQualifiers);
@@ -167,4 +147,17 @@ public class EncodedColumnsUtil {
         return toReturn;
     }
     
+    public static byte[] getColumnQualifierBytes(String columnName, Integer 
numberBasedQualifier, PTable table) {
+        QualifierEncodingScheme encodingScheme = table.getEncodingScheme();
+        return getColumnQualifierBytes(columnName, numberBasedQualifier, 
encodingScheme);
+    }
+    
+    public static byte[] getColumnQualifierBytes(String columnName, Integer 
numberBasedQualifier, QualifierEncodingScheme encodingScheme) {
+        if (encodingScheme == NON_ENCODED_QUALIFIERS) {
+            return Bytes.toBytes(columnName);
+        } else if (numberBasedQualifier != null) {
+            return encodingScheme.getEncodedBytes(numberBasedQualifier);
+        }
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a41074a9/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index d816e3b..00d355c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -20,7 +20,6 @@ package org.apache.phoenix.util;
 import static 
org.apache.phoenix.query.QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX;
 import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_FAMILY;
 import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_QUALIFIER;
-import static 
org.apache.phoenix.util.EncodedColumnsUtil.getEncodedColumnQualifier;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
@@ -84,9 +83,9 @@ import org.apache.phoenix.schema.KeyValueSchema;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnFamily;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.PTable.StorageScheme;
 import org.apache.phoenix.schema.PTableType;
-import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.ValueSchema.Field;
@@ -95,7 +94,6 @@ import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PBinary;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PDecimal;
-import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.tephra.TxConstants;
@@ -435,14 +433,15 @@ public class IndexUtil {
     public static TupleProjector getTupleProjector(Scan scan, 
ColumnReference[] dataColumns) {
         if (dataColumns != null && dataColumns.length != 0) {
             KeyValueSchema keyValueSchema = 
deserializeLocalIndexJoinSchemaFromScan(scan); 
-            boolean storeColsInSingleCell = 
scan.getAttribute(BaseScannerRegionObserver.COLUMNS_STORED_IN_SINGLE_CELL)!=null;
+            boolean storeColsInSingleCell = 
scan.getAttribute(BaseScannerRegionObserver.COLUMNS_STORED_IN_SINGLE_CELL) != 
null;
+            QualifierEncodingScheme scheme = 
EncodedColumnsUtil.getEncodingScheme(scan);
             Expression[] colExpressions = storeColsInSingleCell ? new 
ArrayColumnExpression[dataColumns.length] : new 
KeyValueColumnExpression[dataColumns.length];
             for (int i = 0; i < dataColumns.length; i++) {
                 byte[] family = dataColumns[i].getFamily();
                 byte[] qualifier = dataColumns[i].getQualifier();
                 Field field = keyValueSchema.getField(i);
                 Expression dataColumnExpr =
-                        storeColsInSingleCell ? new 
ArrayColumnExpression(field, family, getEncodedColumnQualifier(qualifier)) 
+                        storeColsInSingleCell ? new 
ArrayColumnExpression(field, family, qualifier, scheme)
                             : new KeyValueColumnExpression(field, family, 
qualifier);
                 colExpressions[i] = dataColumnExpr;
             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a41074a9/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java
index 06c20d3..e10b940 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java
@@ -121,7 +121,7 @@ public class WhereCompilerTest extends 
BaseConnectionlessQueryTest {
         Filter filter = scan.getFilter();
         Expression idExpression = new ColumnRef(plan.getTableRef(), 
plan.getTableRef().getTable().getPColumnForColumnName("ID").getPosition()).newColumnExpression();
         Expression id = new RowKeyColumnExpression(idExpression,new 
RowKeyValueAccessor(plan.getTableRef().getTable().getPKColumns(),0));
-        Expression company = new 
KeyValueColumnExpression(plan.getTableRef().getTable().getPColumnForColumnName("COMPANY"),
 true);
+        Expression company = new 
KeyValueColumnExpression(plan.getTableRef().getTable().getPColumnForColumnName("COMPANY"));
         // FilterList has no equals implementation
         assertTrue(filter instanceof FilterList);
         FilterList filterList = (FilterList)filter;
@@ -153,7 +153,7 @@ public class WhereCompilerTest extends 
BaseConnectionlessQueryTest {
         assertEquals(
                 singleKVFilter(constantComparison(
                     CompareOp.EQUAL,
-                    new KeyValueColumnExpression(column, true),
+                    new KeyValueColumnExpression(column),
                     "c3")),
                 filter);
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a41074a9/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
index 30f8970..c2d02ec 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
@@ -252,9 +252,10 @@ public class CorrelatePlanTest {
         for (int i = 0; i < row.length; i++) {
             String name = ParseNodeFactory.createTempAlias();
             Expression expr = LiteralExpression.newConstant(row[i]);
+            PName colName = PNameFactory.newName(name);
             columns.add(new PColumnImpl(PNameFactory.newName(name), 
PNameFactory.newName(VALUE_COLUMN_FAMILY),
                     expr.getDataType(), expr.getMaxLength(), expr.getScale(), 
expr.isNullable(),
-                    i, expr.getSortOrder(), null, null, false, name, false, 
false, null));
+                    i, expr.getSortOrder(), null, null, false, name, false, 
false, colName.getBytes()));
         }
         try {
             PTable pTable = PTableImpl.makePTable(null, PName.EMPTY_NAME, 
PName.EMPTY_NAME,

Reply via email to