This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 8808e9c  [CARBONDATA-3731] Avoid data copy in Writer
8808e9c is described below

commit 8808e9c65b404007d0b553e6f722a5a83aef4b8d
Author: Jacky Li <jacky.li...@qq.com>
AuthorDate: Sat Feb 29 14:38:38 2020 +0800

    [CARBONDATA-3731] Avoid data copy in Writer
    
    Why is this PR needed?
    For variable length column like String and Binary, currently there are 5 
data copies during data write process, in 
CarbonFactDataHandlerColumnar.processDataRows
    
    What changes were proposed in this PR?
    This PR avoids unnecessary copies:
    
    reduce from 5 copy to 1 copy by using DirectByteBuffer: copy data into 
column page backed by a ByteBuffer (DirectBuffer)
    use Snappy to compress DirectBuffer directly and output compressed data in 
another ByteBuffer (DirectBuffer)
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #3638
---
 .../columnar/DummyBlockIndexerStorage.java         |  60 +++
 .../datastore/compression/AbstractCompressor.java  |  36 +-
 .../core/datastore/compression/Compressor.java     |  15 +-
 .../core/datastore/compression/GzipCompressor.java |  67 +++-
 .../datastore/compression/SnappyCompressor.java    |  71 +---
 .../core/datastore/compression/ZstdCompressor.java |  16 +-
 .../carbondata/core/datastore/page/ColumnPage.java |  61 +--
 .../core/datastore/page/DecimalColumnPage.java     |   2 +-
 .../datastore/page/LVByteBufferColumnPage.java     | 420 +++++++++++++++++++++
 .../core/datastore/page/LocalDictColumnPage.java   |  28 +-
 .../datastore/page/UnsafeFixLengthColumnPage.java  |  12 +
 .../datastore/page/UnsafeVarLengthColumnPage.java  |   2 +-
 .../page/UnsafeVarLengthColumnPageBase.java        |  60 +++
 .../datastore/page/VarLengthColumnPageBase.java    |  36 +-
 .../datastore/page/encoding/ColumnPageEncoder.java |   8 +-
 .../page/encoding/DefaultEncodingFactory.java      |   4 +-
 .../datastore/page/encoding/EncodedColumnPage.java |   8 +-
 .../page/encoding/adaptive/AdaptiveCodec.java      |  13 +-
 .../adaptive/AdaptiveDeltaFloatingCodec.java       |   9 +-
 .../adaptive/AdaptiveDeltaIntegralCodec.java       |   9 +-
 .../encoding/adaptive/AdaptiveFloatingCodec.java   |   9 +-
 .../encoding/adaptive/AdaptiveIntegralCodec.java   |   9 +-
 .../encoding/compress/DirectCompressCodec.java     |   3 +-
 .../legacy/ComplexDimensionIndexCodec.java         |   3 +-
 .../dimension/legacy/DictDimensionIndexCodec.java  |  77 ----
 .../dimension/legacy/IndexStorageEncoder.java      |  37 +-
 ...dexCodec.java => PlainDimensionIndexCodec.java} |  29 +-
 .../core/datastore/page/encoding/rle/RLECodec.java |   5 +-
 .../statistics/LVLongStringStatsCollector.java     |  51 ---
 .../statistics/LVShortStringStatsCollector.java    |  50 ---
 ...atsCollector.java => StringStatsCollector.java} |  22 +-
 .../core/indexstore/ExtendedBlockletWrapper.java   |   3 +-
 .../carbondata/core/util/BlockletDataMapUtil.java  |   6 +-
 .../hadoop/ft/CarbonTableInputFormatTest.java      |   1 +
 .../dataload/TestLoadDataWithCompression.scala     |  16 +-
 .../testsuite/datamap/CGDataMapTestCase.scala      |   4 +-
 .../testsuite/datamap/FGDataMapTestCase.scala      |  10 +-
 .../sortexpr/AllDataTypesTestCaseSort.scala        |  57 +++
 .../carbondata/processing/store/TablePage.java     |  24 +-
 .../carbondata/sdk/file/CarbonWriterBuilder.java   |   6 +-
 40 files changed, 908 insertions(+), 451 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/DummyBlockIndexerStorage.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/DummyBlockIndexerStorage.java
new file mode 100644
index 0000000..4049a41
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/DummyBlockIndexerStorage.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.columnar;
+
+/**
+ * This is for all dimension except DATE
+ * TODO: refactor to use 'encode by meta' without going through 
BlockIndexerStorage
+ */
+public class DummyBlockIndexerStorage extends BlockIndexerStorage<byte[][]> {
+
+  @Override
+  public short[] getRowIdPage() {
+    return new short[0];
+  }
+
+  @Override
+  public int getRowIdPageLengthInBytes() {
+    return 0;
+  }
+
+  @Override
+  public short[] getRowIdRlePage() {
+    return new short[0];
+  }
+
+  @Override
+  public int getRowIdRlePageLengthInBytes() {
+    return 0;
+  }
+
+  @Override
+  public byte[][] getDataPage() {
+    return new byte[0][];
+  }
+
+  @Override
+  public short[] getDataRlePage() {
+    return new short[0];
+  }
+
+  @Override
+  public int getDataRlePageLengthInBytes() {
+    return 0;
+  }
+}
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/AbstractCompressor.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/AbstractCompressor.java
index 7db09d1..12bc2d5 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/AbstractCompressor.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/AbstractCompressor.java
@@ -31,10 +31,11 @@ import org.apache.carbondata.core.util.ByteUtil;
 public abstract class AbstractCompressor implements Compressor {
 
   @Override
-  public byte[] compressShort(short[] unCompInput) {
-    ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * 
ByteUtil.SIZEOF_SHORT);
+  public ByteBuffer compressShort(short[] unCompInput) {
+    ByteBuffer unCompBuffer = ByteBuffer.allocateDirect(unCompInput.length * 
ByteUtil.SIZEOF_SHORT);
     
unCompBuffer.order(ByteOrder.LITTLE_ENDIAN).asShortBuffer().put(unCompInput);
-    return compressByte(unCompBuffer.array());
+    unCompBuffer.position(unCompBuffer.limit());
+    return compressByte(unCompBuffer);
   }
 
   @Override
@@ -48,10 +49,11 @@ public abstract class AbstractCompressor implements 
Compressor {
   }
 
   @Override
-  public byte[] compressInt(int[] unCompInput) {
-    ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * 
ByteUtil.SIZEOF_INT);
+  public ByteBuffer compressInt(int[] unCompInput) {
+    ByteBuffer unCompBuffer = ByteBuffer.allocateDirect(unCompInput.length * 
ByteUtil.SIZEOF_INT);
     unCompBuffer.order(ByteOrder.LITTLE_ENDIAN).asIntBuffer().put(unCompInput);
-    return compressByte(unCompBuffer.array());
+    unCompBuffer.position(unCompBuffer.limit());
+    return compressByte(unCompBuffer);
   }
 
   @Override
@@ -65,10 +67,11 @@ public abstract class AbstractCompressor implements 
Compressor {
   }
 
   @Override
-  public byte[] compressLong(long[] unCompInput) {
-    ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * 
ByteUtil.SIZEOF_LONG);
+  public ByteBuffer compressLong(long[] unCompInput) {
+    ByteBuffer unCompBuffer = ByteBuffer.allocateDirect(unCompInput.length * 
ByteUtil.SIZEOF_LONG);
     
unCompBuffer.order(ByteOrder.LITTLE_ENDIAN).asLongBuffer().put(unCompInput);
-    return compressByte(unCompBuffer.array());
+    unCompBuffer.position(unCompBuffer.limit());
+    return compressByte(unCompBuffer);
   }
 
   @Override
@@ -82,10 +85,11 @@ public abstract class AbstractCompressor implements 
Compressor {
   }
 
   @Override
-  public byte[] compressFloat(float[] unCompInput) {
-    ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * 
ByteUtil.SIZEOF_FLOAT);
+  public ByteBuffer compressFloat(float[] unCompInput) {
+    ByteBuffer unCompBuffer = ByteBuffer.allocateDirect(unCompInput.length * 
ByteUtil.SIZEOF_FLOAT);
     
unCompBuffer.order(ByteOrder.LITTLE_ENDIAN).asFloatBuffer().put(unCompInput);
-    return compressByte(unCompBuffer.array());
+    unCompBuffer.position(unCompBuffer.limit());
+    return compressByte(unCompBuffer);
   }
 
   @Override
@@ -99,10 +103,12 @@ public abstract class AbstractCompressor implements 
Compressor {
   }
 
   @Override
-  public byte[] compressDouble(double[] unCompInput) {
-    ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * 
ByteUtil.SIZEOF_DOUBLE);
+  public ByteBuffer compressDouble(double[] unCompInput) {
+    ByteBuffer unCompBuffer =
+        ByteBuffer.allocateDirect(unCompInput.length * ByteUtil.SIZEOF_DOUBLE);
     
unCompBuffer.order(ByteOrder.LITTLE_ENDIAN).asDoubleBuffer().put(unCompInput);
-    return compressByte(unCompBuffer.array());
+    unCompBuffer.position(unCompBuffer.limit());
+    return compressByte(unCompBuffer);
   }
 
   @Override
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java
index a48034f..24b68f7 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java
@@ -18,12 +18,15 @@
 package org.apache.carbondata.core.datastore.compression;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 public interface Compressor {
 
   String getName();
 
-  byte[] compressByte(byte[] unCompInput);
+  ByteBuffer compressByte(ByteBuffer compInput);
+
+  ByteBuffer compressByte(byte[] unCompInput);
 
   byte[] compressByte(byte[] unCompInput, int byteSize);
 
@@ -31,23 +34,23 @@ public interface Compressor {
 
   byte[] unCompressByte(byte[] compInput, int offset, int length);
 
-  byte[] compressShort(short[] unCompInput);
+  ByteBuffer compressShort(short[] unCompInput);
 
   short[] unCompressShort(byte[] compInput, int offset, int length);
 
-  byte[] compressInt(int[] unCompInput);
+  ByteBuffer compressInt(int[] unCompInput);
 
   int[] unCompressInt(byte[] compInput, int offset, int length);
 
-  byte[] compressLong(long[] unCompInput);
+  ByteBuffer compressLong(long[] unCompInput);
 
   long[] unCompressLong(byte[] compInput, int offset, int length);
 
-  byte[] compressFloat(float[] unCompInput);
+  ByteBuffer compressFloat(float[] unCompInput);
 
   float[] unCompressFloat(byte[] compInput, int offset, int length);
 
-  byte[] compressDouble(double[] unCompInput);
+  ByteBuffer compressDouble(double[] unCompInput);
 
   double[] unCompressDouble(byte[] compInput, int offset, int length);
 
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/GzipCompressor.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/GzipCompressor.java
index a718f9c..390029a 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/GzipCompressor.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/GzipCompressor.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.core.datastore.compression;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
 import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
@@ -34,33 +35,49 @@ public class GzipCompressor extends AbstractCompressor {
     return "gzip";
   }
 
+  private byte[] compressData(byte[] data) {
+    return compressData(data, 0, data.length);
+  }
+
   /**
    * This method takes the Byte Array data and Compresses in gzip format
    *
    * @param data Data Byte Array passed for compression
    * @return Compressed Byte Array
    */
-  private byte[] compressData(byte[] data) {
-    int initialSize = (data.length / 2) == 0 ? data.length : data.length / 2;
-    ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream(initialSize);
-    try {
-      GzipCompressorOutputStream gzipCompressorOutputStream =
-          new GzipCompressorOutputStream(byteArrayOutputStream);
-      try {
-        /**
-         * Below api will write bytes from specified byte array to the 
gzipCompressorOutputStream
-         * The output stream will compress the given byte array.
-         */
-        gzipCompressorOutputStream.write(data);
-      } catch (IOException e) {
-        throw new RuntimeException("Error during Compression writing step ", 
e);
-      } finally {
-        gzipCompressorOutputStream.close();
+  private byte[] compressData(byte[] data, int offset, int length) {
+    int initialSize = (length / 2) == 0 ? length : length / 2;
+    ByteArrayOutputStream output = new ByteArrayOutputStream(initialSize);
+    try (GzipCompressorOutputStream stream = new 
GzipCompressorOutputStream(output)) {
+      /*
+       * Below api will write bytes from specified byte array to the 
gzipCompressorOutputStream
+       * The output stream will compress the given byte array.
+       */
+      stream.write(data, offset, length);
+    } catch (IOException e) {
+      throw new RuntimeException("Error during Compression writing step ", e);
+    }
+    return output.toByteArray();
+  }
+
+  /**
+   * This method takes the ByteBuffer data and Compresses in gzip format
+   *
+   * @param input compression input
+   * @return Compressed Byte Array
+   */
+  private byte[] compressData(ByteBuffer input) {
+    input.flip();
+    int initialSize = (input.limit() / 2) == 0 ? input.limit() : input.limit() 
/ 2;
+    ByteArrayOutputStream output = new ByteArrayOutputStream(initialSize);
+    try (GzipCompressorOutputStream stream = new 
GzipCompressorOutputStream(output)) {
+      for (int i = 0; i < input.limit(); i++) {
+        stream.write(input.get(i));
       }
     } catch (IOException e) {
-      throw new RuntimeException("Error during Compression step ", e);
+      throw new RuntimeException("Error during Compression writing step ", e);
     }
-    return byteArrayOutputStream.toByteArray();
+    return output.toByteArray();
   }
 
   /**
@@ -94,8 +111,18 @@ public class GzipCompressor extends AbstractCompressor {
   }
 
   @Override
-  public byte[] compressByte(byte[] unCompInput) {
-    return compressData(unCompInput);
+  public ByteBuffer compressByte(ByteBuffer compInput) {
+    if (compInput.isDirect()) {
+      return ByteBuffer.wrap(compressData(compInput));
+    } else {
+      byte[] output = compressData(compInput.array(), 0, compInput.position());
+      return ByteBuffer.wrap(output);
+    }
+  }
+
+  @Override
+  public ByteBuffer compressByte(byte[] unCompInput) {
+    return ByteBuffer.wrap(compressData(unCompInput));
   }
 
   @Override
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
index ea3f72b..99ee9be 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datastore.compression;
 
 import java.io.IOException;
 import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
 
@@ -58,9 +59,25 @@ public class SnappyCompressor extends AbstractCompressor {
   }
 
   @Override
-  public byte[] compressByte(byte[] unCompInput) {
+  public ByteBuffer compressByte(ByteBuffer compInput) {
+    int inputLength = compInput.position();
+    ByteBuffer output = 
ByteBuffer.allocateDirect(Snappy.maxCompressedLength(inputLength));
+    int outputSize;
+    compInput.flip();
     try {
-      return Snappy.rawCompress(unCompInput, unCompInput.length);
+      outputSize = Snappy.compress(compInput, output);
+    } catch (IOException e) {
+      LOGGER.error(e.getMessage(), e);
+      throw new RuntimeException(e);
+    }
+    output.limit(outputSize);
+    return output;
+  }
+
+  @Override
+  public ByteBuffer compressByte(byte[] unCompInput) {
+    try {
+      return ByteBuffer.wrap(Snappy.rawCompress(unCompInput, 
unCompInput.length));
     } catch (IOException e) {
       LOGGER.error(e.getMessage(), e);
       throw new RuntimeException(e);
@@ -103,16 +120,6 @@ public class SnappyCompressor extends AbstractCompressor {
   }
 
   @Override
-  public byte[] compressShort(short[] unCompInput) {
-    try {
-      return Snappy.compress(unCompInput);
-    } catch (IOException e) {
-      LOGGER.error(e.getMessage(), e);
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
   public short[] unCompressShort(byte[] compInput, int offset, int length) {
     try {
       return Snappy.uncompressShortArray(compInput, offset, length);
@@ -123,16 +130,6 @@ public class SnappyCompressor extends AbstractCompressor {
   }
 
   @Override
-  public byte[] compressInt(int[] unCompInput) {
-    try {
-      return Snappy.compress(unCompInput);
-    } catch (IOException e) {
-      LOGGER.error(e.getMessage(), e);
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
   public int[] unCompressInt(byte[] compInput, int offset, int length) {
     try {
       return Snappy.uncompressIntArray(compInput, offset, length);
@@ -143,16 +140,6 @@ public class SnappyCompressor extends AbstractCompressor {
   }
 
   @Override
-  public byte[] compressLong(long[] unCompInput) {
-    try {
-      return Snappy.compress(unCompInput);
-    } catch (IOException e) {
-      LOGGER.error(e.getMessage(), e);
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
   public long[] unCompressLong(byte[] compInput, int offset, int length) {
     try {
       return Snappy.uncompressLongArray(compInput, offset, length);
@@ -163,16 +150,6 @@ public class SnappyCompressor extends AbstractCompressor {
   }
 
   @Override
-  public byte[] compressFloat(float[] unCompInput) {
-    try {
-      return Snappy.compress(unCompInput);
-    } catch (IOException e) {
-      LOGGER.error(e.getMessage(), e);
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
   public float[] unCompressFloat(byte[] compInput, int offset, int length) {
     try {
       return Snappy.uncompressFloatArray(compInput, offset, length);
@@ -183,16 +160,6 @@ public class SnappyCompressor extends AbstractCompressor {
   }
 
   @Override
-  public byte[] compressDouble(double[] unCompInput) {
-    try {
-      return Snappy.compress(unCompInput);
-    } catch (IOException e) {
-      LOGGER.error(e.getMessage(), e);
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
   public double[] unCompressDouble(byte[] compInput, int offset, int length) {
     try {
       int uncompressedLength = Snappy.uncompressedLength(compInput, offset, 
length);
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/ZstdCompressor.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/ZstdCompressor.java
index 5de4cf5..139640f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/ZstdCompressor.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/ZstdCompressor.java
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.core.datastore.compression;
 
+import java.nio.ByteBuffer;
+
 import com.github.luben.zstd.Zstd;
 
 public class ZstdCompressor extends AbstractCompressor {
@@ -31,8 +33,18 @@ public class ZstdCompressor extends AbstractCompressor {
   }
 
   @Override
-  public byte[] compressByte(byte[] unCompInput) {
-    return Zstd.compress(unCompInput, COMPRESS_LEVEL);
+  public ByteBuffer compressByte(ByteBuffer compInput) {
+    compInput.flip();
+    if (compInput.isDirect()) {
+      return Zstd.compress(compInput, COMPRESS_LEVEL);
+    } else {
+      return ByteBuffer.wrap(Zstd.compress(compInput.array(), COMPRESS_LEVEL));
+    }
+  }
+
+  @Override
+  public ByteBuffer compressByte(byte[] unCompInput) {
+    return ByteBuffer.wrap(Zstd.compress(unCompInput, COMPRESS_LEVEL));
   }
 
   @Override
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
index 8aca86d..fa71681 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datastore.page;
 
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.util.BitSet;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -146,7 +147,14 @@ public abstract class ColumnPage {
     ColumnPage actualPage;
     ColumnPage encodedPage;
     if (isUnsafeEnabled(columnPageEncoderMeta)) {
-      actualPage = new UnsafeVarLengthColumnPage(columnPageEncoderMeta, 
pageSize);
+      DataType dataType = columnPageEncoderMeta.getStoreDataType();
+      if (dataType == DataTypes.STRING ||
+          dataType == DataTypes.VARCHAR ||
+          dataType == DataTypes.BINARY) {
+        actualPage = new LVByteBufferColumnPage(columnPageEncoderMeta, 
pageSize);
+      } else {
+        actualPage = new UnsafeVarLengthColumnPage(columnPageEncoderMeta, 
pageSize);
+      }
       encodedPage = new UnsafeFixLengthColumnPage(
           new ColumnPageEncoderMeta(columnPageEncoderMeta.getColumnSpec(), 
BYTE_ARRAY,
               columnPageEncoderMeta.getCompressorName()),
@@ -182,20 +190,18 @@ public abstract class ColumnPage {
           dataType == LONG ||
           dataType == DataTypes.FLOAT ||
           dataType == DataTypes.DOUBLE) {
-        instance = new UnsafeFixLengthColumnPage(
-            new ColumnPageEncoderMeta(columnSpec, dataType, compressorName), 
pageSize);
+        instance = new UnsafeFixLengthColumnPage(columnPageEncoderMeta, 
pageSize);
       } else if (dataType == DataTypes.TIMESTAMP) {
         instance = new UnsafeFixLengthColumnPage(
             new ColumnPageEncoderMeta(columnSpec, LONG, compressorName), 
pageSize);
       } else if (DataTypes.isDecimal(dataType)) {
-        instance = new UnsafeDecimalColumnPage(
-            new ColumnPageEncoderMeta(columnSpec, dataType, compressorName), 
pageSize);
-      } else if (dataType == DataTypes.STRING
-          || dataType == BYTE_ARRAY
-          || dataType == DataTypes.VARCHAR
-          || dataType == DataTypes.BINARY) {
-        instance = new UnsafeVarLengthColumnPage(
-            new ColumnPageEncoderMeta(columnSpec, dataType, compressorName), 
pageSize);
+        instance = new UnsafeDecimalColumnPage(columnPageEncoderMeta, 
pageSize);
+      } else if (dataType == DataTypes.STRING ||
+          dataType == DataTypes.VARCHAR ||
+          dataType == DataTypes.BINARY) {
+        instance = new LVByteBufferColumnPage(columnPageEncoderMeta, pageSize);
+      } else if (dataType == BYTE_ARRAY) {
+        instance = new UnsafeVarLengthColumnPage(columnPageEncoderMeta, 
pageSize);
       } else {
         throw new RuntimeException("Unsupported data dataType: " + dataType);
       }
@@ -218,10 +224,11 @@ public abstract class ColumnPage {
         instance = newDoublePage(columnPageEncoderMeta, new double[pageSize]);
       } else if (DataTypes.isDecimal(dataType)) {
         instance = newDecimalPage(columnPageEncoderMeta, new byte[pageSize][]);
-      } else if (dataType == DataTypes.STRING
-          || dataType == BYTE_ARRAY
-          || dataType == DataTypes.VARCHAR
-          || dataType == DataTypes.BINARY) {
+      } else if (dataType == DataTypes.STRING ||
+          dataType == DataTypes.VARCHAR ||
+          dataType == DataTypes.BINARY) {
+        instance = new LVByteBufferColumnPage(columnPageEncoderMeta, pageSize);
+      } else if (dataType == BYTE_ARRAY) {
         instance = new SafeVarLengthColumnPage(columnPageEncoderMeta, 
pageSize);
       } else {
         throw new RuntimeException("Unsupported data dataType: " + dataType);
@@ -230,14 +237,6 @@ public abstract class ColumnPage {
     return instance;
   }
 
-  public static ColumnPage wrapByteArrayPage(TableSpec.ColumnSpec columnSpec, 
byte[][] byteArray,
-      String compressorName) {
-    ColumnPage columnPage = createPage(
-        new ColumnPageEncoderMeta(columnSpec, BYTE_ARRAY, compressorName), 
byteArray.length);
-    columnPage.setByteArrayPage(byteArray);
-    return columnPage;
-  }
-
   private static ColumnPage newBytePage(ColumnPageEncoderMeta meta, byte[] 
byteData) {
     ColumnPageEncoderMeta encoderMeta =
         new ColumnPageEncoderMeta(meta.getColumnSpec(), BYTE, 
meta.getCompressorName());
@@ -698,10 +697,6 @@ public abstract class ColumnPage {
    */
   public abstract void convertValue(ColumnPageValueConverter codec);
 
-  public PageLevelDictionary getPageDictionary() {
-    throw new UnsupportedOperationException("Operation Not Supported");
-  }
-
   /**
    * Return total page data length in bytes
    */
@@ -745,9 +740,11 @@ public abstract class ColumnPage {
   /**
    * Compress page data using specified compressor
    */
-  public byte[] compress(Compressor compressor) throws IOException {
+  public ByteBuffer compress(Compressor compressor) throws IOException {
     DataType dataType = columnPageEncoderMeta.getStoreDataType();
-    if (dataType == DataTypes.BOOLEAN) {
+    if (dataType == DataTypes.STRING) {
+      return compressor.compressByte(getByteBuffer());
+    } else if (dataType == DataTypes.BOOLEAN) {
       return compressor.compressByte(getBooleanPage());
     } else if (dataType == BYTE) {
       return compressor.compressByte(getBytePage());
@@ -776,7 +773,7 @@ public abstract class ColumnPage {
         || columnPageEncoderMeta.getColumnSpec().getColumnType() == 
ColumnType.PLAIN_VALUE)) {
       return compressor.compressByte(getComplexParentFlattenedBytePage());
     } else if (dataType == DataTypes.BINARY) {
-      return getLVFlattenedBytePage();
+      return ByteBuffer.wrap(getLVFlattenedBytePage());
     } else if (dataType == BYTE_ARRAY) {
       return compressor.compressByte(getLVFlattenedBytePage());
     } else {
@@ -939,4 +936,8 @@ public abstract class ColumnPage {
   public ColumnPageEncoderMeta getColumnPageEncoderMeta() {
     return columnPageEncoderMeta;
   }
+
+  public ByteBuffer getByteBuffer() {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/DecimalColumnPage.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/DecimalColumnPage.java
index 6c19f3f..60d0fac 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/DecimalColumnPage.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/DecimalColumnPage.java
@@ -26,7 +26,7 @@ import 
org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
 /**
  * Represent a columnar data in one page for one column of decimal data type
  */
-public abstract class DecimalColumnPage extends VarLengthColumnPageBase {
+public abstract class DecimalColumnPage extends UnsafeVarLengthColumnPageBase {
 
   /**
    * decimal converter instance
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/LVByteBufferColumnPage.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/LVByteBufferColumnPage.java
new file mode 100644
index 0000000..29135af
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/LVByteBufferColumnPage.java
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.ColumnType;
+import org.apache.carbondata.core.datastore.TableSpec;
+import 
org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
+import org.apache.carbondata.core.memory.CarbonUnsafe;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+
+import static 
org.apache.carbondata.core.datastore.page.VarLengthColumnPageBase.DEFAULT_ROW_SIZE;
+import static 
org.apache.carbondata.core.datastore.page.VarLengthColumnPageBase.FACTOR;
+
+import sun.nio.ch.DirectBuffer;
+
+/**
+ * ColumnPage implementation backed by a ByteBuffer
+ * All data added in this page is encoded as LV (Length-Value)
+ */
+public class LVByteBufferColumnPage extends ColumnPage {
+
+  // data of this page
+  private ByteBuffer byteBuffer;
+
+  // the offset of row in the unsafe memory, its size is pageSize + 1
+  protected ColumnPage rowOffset;
+
+  // number of rows in this page
+  private int numRows;
+
+  // the length of bytes added in the page
+  protected int totalLength;
+
+  LVByteBufferColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int 
pageSize) {
+    super(columnPageEncoderMeta, pageSize);
+    checkDataType(columnPageEncoderMeta);
+    TableSpec.ColumnSpec spec = TableSpec.ColumnSpec.newInstance(
+        columnPageEncoderMeta.getColumnSpec().getFieldName(), DataTypes.INT, 
ColumnType.MEASURE);
+    rowOffset = ColumnPage.newPage(
+        new ColumnPageEncoderMeta(spec, DataTypes.INT, 
columnPageEncoderMeta.getCompressorName()),
+        pageSize + 1);
+    byteBuffer = ByteBuffer.allocateDirect((int)(pageSize * DEFAULT_ROW_SIZE * 
FACTOR));
+    numRows = 0;
+    totalLength = 0;
+  }
+
+  private void checkDataType(ColumnPageEncoderMeta columnPageEncoderMeta) {
+    DataType dataType = columnPageEncoderMeta.getStoreDataType();
+    if (dataType != DataTypes.STRING &&
+        dataType != DataTypes.VARCHAR &&
+        dataType != DataTypes.BINARY) {
+      throw new UnsupportedOperationException("Unsupported data type: " + 
dataType);
+    }
+  }
+
+  /**
+   * This putBytes different from others in that it writes LV (Length-Value) 
encoded data
+   * The L part is based on the data type
+   */
+  @Override
+  public void putBytes(int rowId, byte[] bytes) {
+    // since it is variable length, we need to prepare each
+    // element as LV result byte array (first two/four bytes are the length of 
the array)
+    int requiredLength;
+    DataType dataType = getDataType();
+    if (dataType == DataTypes.STRING) {
+      requiredLength = bytes.length + CarbonCommonConstants.SHORT_SIZE_IN_BYTE;
+      ensureMaxLengthForString(requiredLength);
+    } else if (dataType == DataTypes.VARCHAR || dataType == DataTypes.BINARY) {
+      requiredLength = bytes.length + CarbonCommonConstants.INT_SIZE_IN_BYTE;
+    } else {
+      throw new UnsupportedOperationException("unsupported data type: " + 
dataType);
+    }
+
+    // ensure the byte buffer has enough capacity
+    ensureMemory(requiredLength);
+
+    if (dataType == DataTypes.STRING) {
+      byteBuffer.putShort((short)bytes.length);
+    } else {
+      byteBuffer.putInt(bytes.length);
+    }
+    byteBuffer.put(bytes);
+
+    if (rowId == 0) {
+      rowOffset.putInt(0, 0);
+    }
+    rowOffset.putInt(rowId + 1, rowOffset.getInt(rowId) + requiredLength);
+    totalLength += requiredLength;
+    numRows++;
+  }
+
+  /**
+   * reallocate byte buffer if capacity length than current size + request size
+   */
+  protected void ensureMemory(int requestSize) {
+    int capacity = byteBuffer.capacity();
+    if (totalLength + requestSize > capacity) {
+      int newSize = Math.max(2 * capacity, totalLength + requestSize);
+      ByteBuffer newBuffer = ByteBuffer.allocateDirect(newSize);
+      CarbonUnsafe.getUnsafe().copyMemory(((DirectBuffer)byteBuffer).address(),
+          ((DirectBuffer)newBuffer).address(), capacity);
+      newBuffer.position(byteBuffer.position());
+      byteBuffer = newBuffer;
+    }
+  }
+
+  // since we later store a column page in a byte array, so its maximum size 
is 2GB
+  private void ensureMaxLengthForString(int requiredLength) {
+    if (requiredLength > Short.MAX_VALUE) {
+      throw new RuntimeException("input data length " + requiredLength +
+          " bytes too long, maximum length supported is " + Short.MAX_VALUE + 
" bytes");
+    }
+  }
+
+  @Override
+  public ByteBuffer getByteBuffer() {
+    return byteBuffer;
+  }
+
+  @Override
+  public byte[][] getByteArrayPage() {
+    byte[][] output = new byte[numRows][];
+    ByteBuffer buffer = byteBuffer.asReadOnlyBuffer();
+    for (int rowId = 0; rowId < numRows; rowId++) {
+      int offset = rowOffset.getInt(rowId);
+      int length = rowOffset.getInt(rowId + 1) - offset;
+      buffer.position(offset);
+      output[rowId] = new byte[length];
+      buffer.get(output[rowId]);
+    }
+    return output;
+  }
+
+  @Override
+  public byte[] getLVFlattenedBytePage() {
+    // output LV encoded byte array
+    byte[] data = new byte[byteBuffer.position()];
+    ByteBuffer buffer = byteBuffer.asReadOnlyBuffer();
+    buffer.flip();
+    buffer.get(data);
+    return data;
+  }
+
+  @Override
+  public byte[] getComplexChildrenLVFlattenedBytePage(DataType dataType) {
+    // output LV encoded byte array
+    byte[] data = new byte[byteBuffer.position()];
+    ByteBuffer buffer = byteBuffer.asReadOnlyBuffer();
+    buffer.flip();
+    buffer.get(data);
+    return data;
+  }
+
+  @Override
+  public byte[] getComplexParentFlattenedBytePage() {
+    int outputOffset = 0;
+    byte[] output = new byte[totalLength];
+    ByteBuffer buffer = byteBuffer.asReadOnlyBuffer();
+    buffer.flip();
+    for (int rowId = 0; rowId < numRows; rowId++) {
+      int offset = rowOffset.getInt(rowId);
+      int length =  (rowOffset.getInt(rowId + 1) - rowOffset.getInt(rowId));
+      int readLength;
+      DataType dataType = getDataType();
+      if (dataType == DataTypes.STRING) {
+        buffer.position(offset + CarbonCommonConstants.SHORT_SIZE_IN_BYTE);
+        readLength = length - CarbonCommonConstants.SHORT_SIZE_IN_BYTE;
+      } else {
+        buffer.position(offset + CarbonCommonConstants.INT_SIZE_IN_BYTE);
+        readLength = length - CarbonCommonConstants.INT_SIZE_IN_BYTE;
+      }
+      buffer.get(output, outputOffset, readLength);
+      outputOffset += readLength;
+    }
+    return output;
+  }
+
+  @Override
+  public long getPageLengthInBytes() {
+    return totalLength;
+  }
+
+  @Override
+  public void convertValue(ColumnPageValueConverter codec) {
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
+  }
+
+  @Override
+  public void setBytePage(byte[] byteData) {
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
+  }
+
+  @Override
+  public void setShortPage(short[] shortData) {
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
+  }
+
+  @Override
+  public void setShortIntPage(byte[] shortIntData) {
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
+  }
+
+  @Override
+  public void setIntPage(int[] intData) {
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
+  }
+
+  @Override
+  public void setLongPage(long[] longData) {
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
+  }
+
+  @Override
+  public void setFloatPage(float[] floatData) {
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
+  }
+
+  @Override
+  public void setDoublePage(double[] doubleData) {
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
+  }
+
+  @Override
+  public void putBytes(int rowId, byte[] bytes, int offset, int length) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override
+  public byte getByte(int rowId) {
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
+  }
+
+  @Override
+  public short getShort(int rowId) {
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
+  }
+
+  @Override
+  public int getShortInt(int rowId) {
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
+  }
+
+  @Override
+  public int getInt(int rowId) {
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
+  }
+
+  @Override
+  public long getLong(int rowId) {
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
+  }
+
+  @Override
+  public float getFloat(int rowId) {
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
+  }
+
+  @Override
+  public double getDouble(int rowId) {
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
+  }
+
+  @Override
+  public byte[] getBytePage() {
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
+  }
+
+  @Override
+  public short[] getShortPage() {
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
+  }
+
+  @Override
+  public byte[] getShortIntPage() {
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
+  }
+
+  @Override
+  public int[] getIntPage() {
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
+  }
+
+  @Override
+  public long[] getLongPage() {
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
+  }
+
+  @Override
+  public float[] getFloatPage() {
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
+  }
+
+  @Override
+  public double[] getDoublePage() {
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
+  }
+
+  @Override
+  public byte[] getDecimalPage() {
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
+  }
+
+  @Override
+  public byte[] getBytes(int rowId) {
+    int offset = rowOffset.getInt(rowId);
+    int length = rowOffset.getInt(rowId + 1) - rowOffset.getInt(rowId);
+    byte[] data = new byte[length];
+    ByteBuffer duplicated = byteBuffer.asReadOnlyBuffer();
+    duplicated.position(offset);
+    duplicated.limit(offset + length);
+    duplicated.get(data);
+    return data;
+  }
+
+  @Override
+  public void putDecimal(int rowId, BigDecimal decimal) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override
+  public BigDecimal getDecimal(int rowId) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override
+  public void setByteArrayPage(byte[][] byteArray) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override
+  public void freeMemory() {
+
+  }
+
+  @Override
+  public void putByte(int rowId, byte value) {
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
+  }
+
+  @Override
+  public void putShort(int rowId, short value) {
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
+  }
+
+  @Override
+  public void putShortInt(int rowId, int value) {
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
+  }
+
+  @Override
+  public void putInt(int rowId, int value) {
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
+  }
+
+  @Override
+  public void putLong(int rowId, long value) {
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
+  }
+
+  @Override
+  public void putDouble(int rowId, double value) {
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
+  }
+
+  @Override
+  public void putFloat(int rowId, float value) {
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
+  }
+}
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java
index 36edce2..797dd11 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datastore.page;
 
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -28,6 +29,7 @@ import 
org.apache.carbondata.core.localdictionary.PageLevelDictionary;
 import 
org.apache.carbondata.core.localdictionary.exception.DictionaryThresholdReachedException;
 import 
org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 
 import org.apache.log4j.Logger;
 
@@ -105,6 +107,15 @@ public class LocalDictColumnPage extends ColumnPage {
     }
   }
 
+  @Override
+  public ByteBuffer getByteBuffer() {
+    if (null != pageLevelDictionary) {
+      return encodedDataColumnPage.getByteBuffer();
+    } else {
+      return actualDataColumnPage.getByteBuffer();
+    }
+  }
+
   /**
    * Below method will be used to check whether page is local dictionary
    * generated or not. This will be used for while enoding the the page
@@ -126,7 +137,22 @@ public class LocalDictColumnPage extends ColumnPage {
     if (null != pageLevelDictionary) {
       try {
         actualDataColumnPage.putBytes(rowId, bytes);
-        dummyKey[0] = pageLevelDictionary.getDictionaryValue(bytes);
+        byte[] input;
+        DataType dataType = 
actualDataColumnPage.columnPageEncoderMeta.getStoreDataType();
+        if (dataType == DataTypes.STRING) {
+          ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length + 2);
+          byteBuffer.putShort((short) bytes.length);
+          byteBuffer.put(bytes);
+          input = byteBuffer.array();
+        } else if (dataType == DataTypes.VARCHAR || dataType == 
DataTypes.BINARY) {
+          ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length + 4);
+          byteBuffer.putInt(bytes.length);
+          byteBuffer.put(bytes);
+          input = byteBuffer.array();
+        } else {
+          input = bytes;
+        }
+        dummyKey[0] = pageLevelDictionary.getDictionaryValue(input);
         encodedDataColumnPage.putBytes(rowId, 
keyGenerator.generateKey(dummyKey));
       } catch (DictionaryThresholdReachedException e) {
         LOGGER.warn("Local Dictionary threshold reached for the column: " + 
actualDataColumnPage
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
index 47b48e5..9a1fac2 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.core.datastore.page;
 
 import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 
 import 
org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import org.apache.carbondata.core.memory.CarbonUnsafe;
@@ -28,6 +29,8 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
 
+import sun.nio.ch.DirectBuffer;
+
 // This extension uses unsafe memory to store page data, for fix length data 
type only (byte,
 // short, integer, long, float, double)
 public class UnsafeFixLengthColumnPage extends ColumnPage {
@@ -357,6 +360,15 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
   }
 
   @Override
+  public ByteBuffer getByteBuffer() {
+    int numRow = getEndLoop();
+    ByteBuffer out = ByteBuffer.allocateDirect(numRow * eachRowSize);
+    CarbonUnsafe.getUnsafe().copyMemory(
+        memoryBlock.getBaseOffset(), ((DirectBuffer)out).address(), numRow * 
eachRowSize);
+    return out;
+  }
+
+  @Override
   public byte[] getLVFlattenedBytePage() {
     throw new UnsupportedOperationException(
         "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
index 2897043..ddee0a9 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
@@ -27,7 +27,7 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes;
 /**
  * This extension uses unsafe memory to store page data, for variable length 
data type (string)
  */
-public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
+public class UnsafeVarLengthColumnPage extends UnsafeVarLengthColumnPageBase {
 
   /**
    * create a page
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPageBase.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPageBase.java
new file mode 100644
index 0000000..c8e58ea
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPageBase.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page;
+
+import 
org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
+import org.apache.carbondata.core.memory.CarbonUnsafe;
+import org.apache.carbondata.core.memory.MemoryBlock;
+import org.apache.carbondata.core.memory.UnsafeMemoryManager;
+
+public abstract class UnsafeVarLengthColumnPageBase extends 
VarLengthColumnPageBase {
+
+  // memory allocated by Unsafe
+  protected MemoryBlock memoryBlock;
+
+  // base address of memoryBlock
+  protected Object baseAddress;
+
+  // base offset of memoryBlock
+  protected long baseOffset;
+
+  // size of the allocated memory, in bytes
+  protected int capacity;
+
+  UnsafeVarLengthColumnPageBase(ColumnPageEncoderMeta columnPageEncoderMeta, 
int pageSize) {
+    super(columnPageEncoderMeta, pageSize);
+  }
+
+  /**
+   * reallocate memory if capacity length than current size + request size
+   */
+  protected void ensureMemory(int requestSize) {
+    if (totalLength + requestSize > capacity) {
+      int newSize = Math.max(2 * capacity, totalLength + requestSize);
+      MemoryBlock newBlock = 
UnsafeMemoryManager.allocateMemoryWithRetry(taskId, newSize);
+      CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset,
+          newBlock.getBaseObject(), newBlock.getBaseOffset(), capacity);
+      UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock);
+      memoryBlock = newBlock;
+      baseAddress = newBlock.getBaseObject();
+      baseOffset = newBlock.getBaseOffset();
+      capacity = newSize;
+    }
+  }
+
+}
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
index 36881df..fa1faa6 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
@@ -24,9 +24,6 @@ import 
org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
 import org.apache.carbondata.core.datastore.ColumnType;
 import org.apache.carbondata.core.datastore.TableSpec;
 import 
org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
-import org.apache.carbondata.core.memory.CarbonUnsafe;
-import org.apache.carbondata.core.memory.MemoryBlock;
-import org.apache.carbondata.core.memory.UnsafeMemoryManager;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
@@ -47,23 +44,11 @@ public abstract class VarLengthColumnPageBase extends 
ColumnPage {
 
   final String taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
 
-  // memory allocated by Unsafe
-  MemoryBlock memoryBlock;
-
-  // base address of memoryBlock
-  Object baseAddress;
-
   // the offset of row in the unsafe memory, its size is pageSize + 1
-  ColumnPage rowOffset;
+  protected ColumnPage rowOffset;
 
   // the length of bytes added in the page
-  int totalLength;
-
-  // base offset of memoryBlock
-  long baseOffset;
-
-  // size of the allocated memory, in bytes
-  int capacity;
+  protected int totalLength;
 
   VarLengthColumnPageBase(ColumnPageEncoderMeta columnPageEncoderMeta, int 
pageSize) {
     super(columnPageEncoderMeta, pageSize);
@@ -512,23 +497,6 @@ public abstract class VarLengthColumnPageBase extends 
ColumnPage {
   }
 
   /**
-   * reallocate memory if capacity length than current size + request size
-   */
-  protected void ensureMemory(int requestSize) {
-    if (totalLength + requestSize > capacity) {
-      int newSize = Math.max(2 * capacity, totalLength + requestSize);
-      MemoryBlock newBlock = 
UnsafeMemoryManager.allocateMemoryWithRetry(taskId, newSize);
-      CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset,
-          newBlock.getBaseObject(), newBlock.getBaseOffset(), capacity);
-      UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock);
-      memoryBlock = newBlock;
-      baseAddress = newBlock.getBaseObject();
-      baseOffset = newBlock.getBaseOffset();
-      capacity = newSize;
-    }
-  }
-
-  /**
    * free memory as needed
    */
   public void freeMemory() {
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
index 81e4844..182d0d4 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
@@ -57,7 +57,7 @@ public abstract class ColumnPageEncoder {
   private static final Logger LOGGER =
       LogServiceFactory.getLogService(ColumnPageEncoder.class.getName());
 
-  protected abstract byte[] encodeData(ColumnPage input) throws IOException;
+  protected abstract ByteBuffer encodeData(ColumnPage input) throws 
IOException;
 
   protected abstract List<Encoding> getEncodingList();
 
@@ -91,15 +91,15 @@ public abstract class ColumnPageEncoder {
    * The encoded binary data and metadata are wrapped in encoding column page
    */
   public EncodedColumnPage encode(ColumnPage inputPage) throws IOException {
-    byte[] encodedBytes = encodeData(inputPage);
+    ByteBuffer encodedBytes = encodeData(inputPage);
     DataChunk2 pageMetadata = buildPageMetadata(inputPage, encodedBytes);
     return new EncodedColumnPage(pageMetadata, encodedBytes, inputPage);
   }
 
-  private DataChunk2 buildPageMetadata(ColumnPage inputPage, byte[] 
encodedBytes)
+  private DataChunk2 buildPageMetadata(ColumnPage inputPage, ByteBuffer 
encodedBytes)
       throws IOException {
     DataChunk2 dataChunk = new DataChunk2();
-    dataChunk.setData_page_length(encodedBytes.length);
+    dataChunk.setData_page_length(encodedBytes.limit() - 
encodedBytes.position());
     fillBasicFields(inputPage, dataChunk);
     fillNullBitSet(inputPage, dataChunk);
     fillEncoding(inputPage, dataChunk);
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
index 776ed1a..cdcd8e3 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
@@ -30,7 +30,7 @@ import 
org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveInteg
 import 
org.apache.carbondata.core.datastore.page.encoding.compress.DirectCompressCodec;
 import 
org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.ComplexDimensionIndexCodec;
 import 
org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.DirectDictDimensionIndexCodec;
-import 
org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.HighCardDictDimensionIndexCodec;
+import 
org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.PlainDimensionIndexCodec;
 import 
org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector;
 import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.metadata.datatype.DataType;
@@ -94,7 +94,7 @@ public class DefaultEncodingFactory extends EncodingFactory {
             dimensionSpec.isInSortColumns() && 
dimensionSpec.isDoInvertedIndex())
             .createEncoder(null);
       case PLAIN_VALUE:
-        return new 
HighCardDictDimensionIndexCodec(dimensionSpec.isInSortColumns(),
+        return new PlainDimensionIndexCodec(dimensionSpec.isInSortColumns(),
             dimensionSpec.isInSortColumns() && 
dimensionSpec.isDoInvertedIndex(),
             dimensionSpec.getSchemaDataType() == DataTypes.VARCHAR
                 || dimensionSpec.getSchemaDataType() == 
DataTypes.BINARY).createEncoder(null);
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedColumnPage.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedColumnPage.java
index 6f78d95..58f00a9 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedColumnPage.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedColumnPage.java
@@ -32,7 +32,7 @@ import org.apache.carbondata.format.DataChunk2;
 public class EncodedColumnPage {
 
   // encoded and compressed column page data
-  protected final byte[] encodedData;
+  protected final ByteBuffer encodedData;
 
   // metadata of this page
   private DataChunk2 pageMetadata;
@@ -44,7 +44,7 @@ public class EncodedColumnPage {
    * @param pageMetadata metadata of the encoded page
    * @param encodedData encoded data for this page
    */
-  public EncodedColumnPage(DataChunk2 pageMetadata, byte[] encodedData,
+  public EncodedColumnPage(DataChunk2 pageMetadata, ByteBuffer encodedData,
       ColumnPage actualPage) {
     if (pageMetadata == null) {
       throw new IllegalArgumentException("data chunk2 must not be null");
@@ -61,7 +61,7 @@ public class EncodedColumnPage {
    * return the encoded data as ByteBuffer
    */
   public ByteBuffer getEncodedData() {
-    return ByteBuffer.wrap(encodedData);
+    return encodedData;
   }
 
   public DataChunk2 getPageMetadata() {
@@ -73,7 +73,7 @@ public class EncodedColumnPage {
    */
   public int getTotalSerializedSize() {
     int metadataSize = CarbonUtil.getByteArray(pageMetadata).length;
-    int dataSize = encodedData.length;
+    int dataSize = encodedData.limit() - encodedData.position();
     return metadataSize + dataSize;
   }
 
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java
index cebbc0f..17584e3 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java
@@ -20,6 +20,7 @@ package 
org.apache.carbondata.core.datastore.page.encoding.adaptive;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorage;
@@ -157,11 +158,11 @@ public abstract class AdaptiveCodec implements 
ColumnPageCodec {
    * @param result
    * @throws IOException
    */
-  public byte[] writeInvertedIndexIfRequired(byte[] result) throws IOException 
{
+  public ByteBuffer writeInvertedIndexIfRequired(ByteBuffer result) throws 
IOException {
     ByteArrayOutputStream stream = new ByteArrayOutputStream();
     DataOutputStream out = new DataOutputStream(stream);
     if (null != indexStorage) {
-      out.write(result);
+      out.write(result.array(), 0, result.position());
       if (indexStorage.getRowIdPageLengthInBytes() > 0) {
         out.writeInt(indexStorage.getRowIdPageLengthInBytes());
         short[] rowIdPage = (short[]) indexStorage.getRowIdPage();
@@ -178,7 +179,7 @@ public abstract class AdaptiveCodec implements 
ColumnPageCodec {
     }
     byte[] bytes = stream.toByteArray();
     stream.close();
-    return bytes;
+    return ByteBuffer.wrap(bytes);
   }
 
   /**
@@ -187,7 +188,7 @@ public abstract class AdaptiveCodec implements 
ColumnPageCodec {
    * @param dataChunk
    * @param result
    */
-  public void fillLegacyFieldsIfRequired(DataChunk2 dataChunk, byte[] result) {
+  public void fillLegacyFieldsIfRequired(DataChunk2 dataChunk, ByteBuffer 
result) {
     if (null != indexStorage) {
       SortState sort = (indexStorage.getRowIdPageLengthInBytes() > 0) ?
           SortState.SORT_EXPLICIT :
@@ -203,7 +204,7 @@ public abstract class AdaptiveCodec implements 
ColumnPageCodec {
       dataChunk.setRowid_page_length(0);
     }
     if (null != result) {
-      dataChunk.setData_page_length(result.length);
+      dataChunk.setData_page_length(result.limit() - result.position());
     }
   }
 
@@ -227,7 +228,7 @@ public abstract class AdaptiveCodec implements 
ColumnPageCodec {
     }
   }
 
-  public byte[] encodeAndCompressPage(ColumnPage input, 
ColumnPageValueConverter converter,
+  public ByteBuffer encodeAndCompressPage(ColumnPage input, 
ColumnPageValueConverter converter,
       Compressor compressor) throws IOException {
     encodedPage = ColumnPage.newPage(
         new 
ColumnPageEncoderMeta(input.getColumnPageEncoderMeta().getColumnSpec(), 
targetDataType,
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
index 8579973..235deb8 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.core.datastore.page.encoding.adaptive;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.List;
@@ -80,18 +81,18 @@ public class AdaptiveDeltaFloatingCodec extends 
AdaptiveCodec {
   @Override
   public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
     return new ColumnPageEncoder() {
-      byte[] result = null;
+      ByteBuffer result = null;
       @Override
-      protected byte[] encodeData(ColumnPage input) throws IOException {
+      protected ByteBuffer encodeData(ColumnPage input) throws IOException {
         if (encodedPage != null) {
           throw new IllegalStateException("already encoded");
         }
         Compressor compressor = CompressorFactory.getInstance().getCompressor(
             input.getColumnCompressorName());
         result = encodeAndCompressPage(input, converter, compressor);
-        byte[] bytes = writeInvertedIndexIfRequired(result);
+        ByteBuffer bytes = writeInvertedIndexIfRequired(result);
         encodedPage.freeMemory();
-        if (bytes.length != 0) {
+        if (bytes.limit() != 0) {
           return bytes;
         }
         return result;
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
index 1a0645c..573c225 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
@@ -19,6 +19,7 @@ package 
org.apache.carbondata.core.datastore.page.encoding.adaptive;
 
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.List;
@@ -87,18 +88,18 @@ public class AdaptiveDeltaIntegralCodec extends 
AdaptiveCodec {
   @Override
   public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
     return new ColumnPageEncoder() {
-      byte[] result = null;
+      ByteBuffer result = null;
       @Override
-      protected byte[] encodeData(ColumnPage input) throws IOException {
+      protected ByteBuffer encodeData(ColumnPage input) throws IOException {
         if (encodedPage != null) {
           throw new IllegalStateException("already encoded");
         }
         Compressor compressor =
             
CompressorFactory.getInstance().getCompressor(input.getColumnCompressorName());
         result = encodeAndCompressPage(input, converter, compressor);
-        byte[] bytes = writeInvertedIndexIfRequired(result);
+        ByteBuffer bytes = writeInvertedIndexIfRequired(result);
         encodedPage.freeMemory();
-        if (bytes.length != 0) {
+        if (bytes.limit() != 0) {
           return bytes;
         }
         return result;
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
index 818aa90..a4b83bf 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.core.datastore.page.encoding.adaptive;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.List;
@@ -68,18 +69,18 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec {
   @Override
   public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
     return new ColumnPageEncoder() {
-      byte[] result = null;
+      ByteBuffer result = null;
       @Override
-      protected byte[] encodeData(ColumnPage input) throws IOException {
+      protected ByteBuffer encodeData(ColumnPage input) throws IOException {
         if (encodedPage != null) {
           throw new IllegalStateException("already encoded");
         }
         Compressor compressor =
             
CompressorFactory.getInstance().getCompressor(input.getColumnCompressorName());
         result = encodeAndCompressPage(input, converter, compressor);
-        byte[] bytes = writeInvertedIndexIfRequired(result);
+        ByteBuffer bytes = writeInvertedIndexIfRequired(result);
         encodedPage.freeMemory();
-        if (bytes.length != 0) {
+        if (bytes.limit() != 0) {
           return bytes;
         }
         return result;
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
index fb46b59..3554cd3 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.core.datastore.page.encoding.adaptive;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.List;
@@ -65,18 +66,18 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
   @Override
   public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
     return new ColumnPageEncoder() {
-      byte[] result = null;
+      ByteBuffer result = null;
       @Override
-      protected byte[] encodeData(ColumnPage input) throws IOException {
+      protected ByteBuffer encodeData(ColumnPage input) throws IOException {
         if (encodedPage != null) {
           throw new IllegalStateException("already encoded");
         }
         Compressor compressor =
             
CompressorFactory.getInstance().getCompressor(input.getColumnCompressorName());
         result = encodeAndCompressPage(input, converter, compressor);
-        byte[] bytes = writeInvertedIndexIfRequired(result);
+        ByteBuffer bytes = writeInvertedIndexIfRequired(result);
         encodedPage.freeMemory();
-        if (bytes.length != 0) {
+        if (bytes.limit() != 0) {
           return bytes;
         }
         return result;
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
index 52f42d3..5fff9c2 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.core.datastore.page.encoding.compress;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.List;
@@ -73,7 +74,7 @@ public class DirectCompressCodec implements ColumnPageCodec {
     return new ColumnPageEncoder() {
 
       @Override
-      protected byte[] encodeData(ColumnPage input) throws IOException {
+      protected ByteBuffer encodeData(ColumnPage input) throws IOException {
         Compressor compressor = CompressorFactory.getInstance().getCompressor(
             input.getColumnCompressorName());
         return input.compress(compressor);
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java
index f232652..5a6d3c0 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.core.datastore.page.encoding.dimension.legacy;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -51,7 +52,7 @@ public class ComplexDimensionIndexCodec extends 
IndexStorageCodec {
         byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
         Compressor compressor = CompressorFactory.getInstance().getCompressor(
             inputPage.getColumnCompressorName());
-        byte[] compressed = compressor.compressByte(flattened);
+        ByteBuffer compressed = compressor.compressByte(flattened);
         super.indexStorage = indexStorage;
         super.compressedDataPage = compressed;
       }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java
deleted file mode 100644
index f3475fd..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.encoding.dimension.legacy;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorage;
-import 
org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForShort;
-import 
org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
-import org.apache.carbondata.core.datastore.compression.Compressor;
-import org.apache.carbondata.core.datastore.compression.CompressorFactory;
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
-import org.apache.carbondata.core.util.ByteUtil;
-import org.apache.carbondata.format.Encoding;
-
-public class DictDimensionIndexCodec extends IndexStorageCodec {
-
-  public DictDimensionIndexCodec(boolean isSort, boolean isInvertedIndex) {
-    super(isSort, isInvertedIndex);
-  }
-
-  @Override
-  public String getName() {
-    return "DictDimensionIndexCodec";
-  }
-
-  @Override
-  public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
-    return new IndexStorageEncoder() {
-      @Override
-      void encodeIndexStorage(ColumnPage inputPage) {
-        BlockIndexerStorage<byte[][]> indexStorage;
-        byte[][] data = inputPage.getByteArrayPage();
-        if (isInvertedIndex) {
-          indexStorage = new BlockIndexerStorageForShort(data, true, false, 
isSort);
-        } else {
-          indexStorage = new 
BlockIndexerStorageForNoInvertedIndexForShort(data, false);
-        }
-        byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
-        Compressor compressor = CompressorFactory.getInstance().getCompressor(
-            inputPage.getColumnCompressorName());
-        super.compressedDataPage = compressor.compressByte(flattened);
-        super.indexStorage = indexStorage;
-      }
-
-      @Override
-      protected List<Encoding> getEncodingList() {
-        List<Encoding> encodings = new ArrayList<>();
-        encodings.add(Encoding.DICTIONARY);
-        encodings.add(Encoding.RLE);
-        if (isInvertedIndex) {
-          encodings.add(Encoding.INVERTED_INDEX);
-        }
-        return encodings;
-      }
-
-    };
-  }
-}
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/IndexStorageEncoder.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/IndexStorageEncoder.java
index fe0d8a5..019ecbf 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/IndexStorageEncoder.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/IndexStorageEncoder.java
@@ -20,6 +20,7 @@ package 
org.apache.carbondata.core.datastore.page.encoding.dimension.legacy;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorage;
@@ -31,38 +32,48 @@ import org.apache.carbondata.format.SortState;
 
 public abstract class IndexStorageEncoder extends ColumnPageEncoder {
   BlockIndexerStorage indexStorage;
-  byte[] compressedDataPage;
+  ByteBuffer compressedDataPage;
 
   abstract void encodeIndexStorage(ColumnPage inputPage);
 
   @Override
-  protected byte[] encodeData(ColumnPage input) throws IOException {
+  protected ByteBuffer encodeData(ColumnPage input) throws IOException {
     encodeIndexStorage(input);
+    if (indexStorage.getRowIdPageLengthInBytes() > 0 ||
+        indexStorage.getDataRlePageLengthInBytes() > 0) {
+      return appendToCompressedData();
+    } else {
+      return compressedDataPage;
+    }
+  }
+
+  private ByteBuffer appendToCompressedData() throws IOException {
     ByteArrayOutputStream stream = new ByteArrayOutputStream();
     DataOutputStream out = new DataOutputStream(stream);
-    out.write(compressedDataPage);
+    ByteBuffer buffer = compressedDataPage.asReadOnlyBuffer();
+    int length = buffer.limit();
+    for (int i = 0; i < length; i++) {
+      out.writeByte(buffer.get(i));
+    }
     if (indexStorage.getRowIdPageLengthInBytes() > 0) {
       out.writeInt(indexStorage.getRowIdPageLengthInBytes());
-      short[] rowIdPage = (short[])indexStorage.getRowIdPage();
-      for (short rowId : rowIdPage) {
+      for (short rowId : indexStorage.getRowIdPage()) {
         out.writeShort(rowId);
       }
       if (indexStorage.getRowIdRlePageLengthInBytes() > 0) {
-        short[] rowIdRlePage = (short[])indexStorage.getRowIdRlePage();
-        for (short rowIdRle : rowIdRlePage) {
+        for (short rowIdRle : indexStorage.getRowIdRlePage()) {
           out.writeShort(rowIdRle);
         }
       }
     }
     if (indexStorage.getDataRlePageLengthInBytes() > 0) {
-      short[] dataRlePage = (short[])indexStorage.getDataRlePage();
-      for (short dataRle : dataRlePage) {
+      for (short dataRle : indexStorage.getDataRlePage()) {
         out.writeShort(dataRle);
       }
     }
-    byte[] result = stream.toByteArray();
-    stream.close();
-    return result;
+    byte[] output = stream.toByteArray();
+    out.close();
+    return ByteBuffer.wrap(output);
   }
 
   @Override
@@ -84,6 +95,6 @@ public abstract class IndexStorageEncoder extends 
ColumnPageEncoder {
     if (indexStorage.getDataRlePageLengthInBytes() > 0) {
       dataChunk.setRle_page_length(indexStorage.getDataRlePageLengthInBytes());
     }
-    dataChunk.setData_page_length(compressedDataPage.length);
+    dataChunk.setData_page_length(compressedDataPage.limit() - 
compressedDataPage.position());
   }
 }
\ No newline at end of file
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/PlainDimensionIndexCodec.java
similarity index 75%
rename from 
core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
rename to 
core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/PlainDimensionIndexCodec.java
index 7a1627c..e50a599 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/PlainDimensionIndexCodec.java
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.core.datastore.page.encoding.dimension.legacy;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -24,6 +25,7 @@ import java.util.Map;
 import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorage;
 import 
org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForShort;
 import 
org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
+import org.apache.carbondata.core.datastore.columnar.DummyBlockIndexerStorage;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
@@ -31,13 +33,13 @@ import 
org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.format.Encoding;
 
-public class HighCardDictDimensionIndexCodec extends IndexStorageCodec {
+public class PlainDimensionIndexCodec extends IndexStorageCodec {
   /**
    * whether this column is varchar data type(long string)
    */
   private boolean isVarcharType;
 
-  public HighCardDictDimensionIndexCodec(boolean isSort, boolean 
isInvertedIndex,
+  public PlainDimensionIndexCodec(boolean isSort, boolean isInvertedIndex,
       boolean isVarcharType) {
     super(isSort, isInvertedIndex);
     this.isVarcharType = isVarcharType;
@@ -45,7 +47,7 @@ public class HighCardDictDimensionIndexCodec extends 
IndexStorageCodec {
 
   @Override
   public String getName() {
-    return "HighCardDictDimensionIndexCodec";
+    return "PlainDimensionIndexCodec";
   }
 
   @Override
@@ -55,18 +57,21 @@ public class HighCardDictDimensionIndexCodec extends 
IndexStorageCodec {
       @Override
       protected void encodeIndexStorage(ColumnPage input) {
         BlockIndexerStorage<byte[][]> indexStorage;
-        byte[][] data = input.getByteArrayPage();
         boolean isDictionary = input.isLocalDictGeneratedPage();
-        if (isInvertedIndex) {
-          indexStorage = new BlockIndexerStorageForShort(data, isDictionary, 
!isDictionary, isSort);
-        } else {
-          indexStorage =
-              new BlockIndexerStorageForNoInvertedIndexForShort(data, 
isDictionary);
-        }
-        byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
         Compressor compressor = CompressorFactory.getInstance().getCompressor(
             input.getColumnCompressorName());
-        super.compressedDataPage = compressor.compressByte(flattened);
+        if (isInvertedIndex || isDictionary) {
+          byte[][] byteArray = input.getByteArrayPage();
+          indexStorage = isInvertedIndex ?
+            new BlockIndexerStorageForShort(byteArray, isDictionary, 
!isDictionary, isSort) :
+            new BlockIndexerStorageForNoInvertedIndexForShort(byteArray, true);
+          byte[] compressInput = ByteUtil.flatten(indexStorage.getDataPage());
+          super.compressedDataPage = compressor.compressByte(compressInput);
+        } else {
+          ByteBuffer data = input.getByteBuffer();
+          indexStorage = new DummyBlockIndexerStorage();
+          super.compressedDataPage = compressor.compressByte(data);
+        }
         super.indexStorage = indexStorage;
       }
 
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
index be5109a..cdfb700 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
@@ -22,6 +22,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.List;
@@ -113,7 +114,7 @@ public class RLECodec implements ColumnPageCodec {
     }
 
     @Override
-    protected byte[] encodeData(ColumnPage input) throws IOException {
+    protected ByteBuffer encodeData(ColumnPage input) throws IOException {
       validateDataType(input.getDataType());
       this.dataType = input.getDataType();
       if (dataType == DataTypes.BYTE) {
@@ -140,7 +141,7 @@ public class RLECodec implements ColumnPageCodec {
         throw new UnsupportedOperationException(input.getDataType() +
             " does not support RLE encoding");
       }
-      return collectResult();
+      return ByteBuffer.wrap(collectResult());
     }
 
     @Override
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVLongStringStatsCollector.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVLongStringStatsCollector.java
deleted file mode 100644
index a7bb47e..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVLongStringStatsCollector.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.statistics;
-
-import org.apache.carbondata.core.util.ByteUtil;
-
-/**
- * This class is for the columns with varchar data type,
- * a string type which can hold more than 32000 characters
- */
-public class LVLongStringStatsCollector extends LVStringStatsCollector {
-
-  public static LVLongStringStatsCollector newInstance() {
-    return new LVLongStringStatsCollector();
-  }
-
-  private LVLongStringStatsCollector() {
-
-  }
-
-  @Override
-  protected byte[] getActualValue(byte[] value) {
-    byte[] actualValue;
-    assert (value.length >= 4);
-    if (value.length == 4) {
-      assert (value[0] == 0 && value[1] == 0);
-      actualValue = new byte[0];
-    } else {
-      int length = ByteUtil.toInt(value, 0);
-      assert (length > 0);
-      actualValue = new byte[value.length - 4];
-      System.arraycopy(value, 4, actualValue, 0, actualValue.length);
-    }
-    return actualValue;
-  }
-}
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVShortStringStatsCollector.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVShortStringStatsCollector.java
deleted file mode 100644
index 21b06d5..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVShortStringStatsCollector.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.statistics;
-
-import org.apache.carbondata.core.util.ByteUtil;
-
-/**
- * This class is for the columns with string data type which hold less than 
32000 characters
- */
-public class LVShortStringStatsCollector extends LVStringStatsCollector {
-
-  public static LVShortStringStatsCollector newInstance() {
-    return new LVShortStringStatsCollector();
-  }
-
-  private LVShortStringStatsCollector() {
-
-  }
-
-  @Override
-  protected byte[] getActualValue(byte[] value) {
-    byte[] actualValue;
-    assert (value.length >= 2);
-    if (value.length == 2) {
-      assert (value[0] == 0 && value[1] == 0);
-      actualValue = new byte[0];
-    } else {
-      int length = ByteUtil.toShort(value, 0);
-      assert (length > 0);
-      actualValue = new byte[value.length - 2];
-      System.arraycopy(value, 2, actualValue, 0, actualValue.length);
-    }
-    return actualValue;
-  }
-}
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVStringStatsCollector.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/StringStatsCollector.java
similarity index 88%
rename from 
core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVStringStatsCollector.java
rename to 
core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/StringStatsCollector.java
index 8095bd5..a92dda0 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVStringStatsCollector.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/StringStatsCollector.java
@@ -25,7 +25,7 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonProperties;
 
-public abstract class LVStringStatsCollector implements 
ColumnPageStatsCollector {
+public class StringStatsCollector implements ColumnPageStatsCollector {
 
   /**
    * allowed character limit for to be considered for storing min max
@@ -44,6 +44,10 @@ public abstract class LVStringStatsCollector implements 
ColumnPageStatsCollector
    */
   private boolean ignoreWritingMinMax;
 
+  public static StringStatsCollector newInstance() {
+    return new StringStatsCollector();
+  }
+
   @Override
   public void updateNull(int rowId) {
 
@@ -84,27 +88,23 @@ public abstract class LVStringStatsCollector implements 
ColumnPageStatsCollector
 
   }
 
-  protected abstract byte[] getActualValue(byte[] value);
-
   @Override
   public void update(byte[] value) {
     // return if min/max need not be written
     if (isIgnoreMinMaxFlagSet(value)) {
       return;
     }
-    // input value is LV encoded
-    byte[] newValue = getActualValue(value);
     if (min == null) {
-      min = newValue;
+      min = value;
     }
     if (null == max) {
-      max = newValue;
+      max = value;
     }
-    if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(min, newValue) > 0) {
-      min = newValue;
+    if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(min, value) > 0) {
+      min = value;
     }
-    if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(max, newValue) < 0) {
-      max = newValue;
+    if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(max, value) < 0) {
+      max = value;
     }
   }
 
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java
index 8b51834..b9fb4b5 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java
@@ -125,7 +125,8 @@ public class ExtendedBlockletWrapper implements Writable, 
Serializable {
         
extendedBlocklet.setFilePath(extendedBlocklet.getFilePath().replace(tablePath, 
""));
         extendedBlocklet.serializeData(stream, uniqueLocations, isCountJob);
       }
-      return new SnappyCompressor().compressByte(bos.toByteArray());
+      byte[] input = bos.toByteArray();
+      return new SnappyCompressor().compressByte(input, input.length);
     } catch (IOException e) {
       throw new RuntimeException(e);
     } finally {
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
index 27ce904..237d349 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
@@ -24,6 +24,7 @@ import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -318,8 +319,9 @@ public class BlockletDataMapUtil {
     }
     byte[] byteArray = stream.toByteArray();
     // Compress to reduce the size of schema
-    return 
CompressorFactory.NativeSupportedCompressor.SNAPPY.getCompressor().compressByte(
-        byteArray);
+    ByteBuffer byteBuffer =
+        
CompressorFactory.NativeSupportedCompressor.SNAPPY.getCompressor().compressByte(byteArray);
+    return byteBuffer.array();
   }
 
   /**
diff --git 
a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java
 
b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java
index 3a29d58..79d447a 100644
--- 
a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java
+++ 
b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java
@@ -74,6 +74,7 @@ public class CarbonTableInputFormatTest {
           new 
File("../hadoop/src/test/resources/data.csv").getCanonicalPath());
       loadModel = creator.createCarbonStore();
     } catch (Exception e) {
+      e.printStackTrace();
       Assert.fail("create table failed: " + e.getMessage());
     }
   }
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala
index b982149..20c6100 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala
@@ -50,7 +50,9 @@ case class Rcd(booleanField: Boolean, shortField: Short, 
intField: Int, bigintFi
 class CustomizeCompressor extends Compressor {
   override def getName: String = 
"org.apache.carbondata.integration.spark.testsuite.dataload.CustomizeCompressor"
 
-  override def compressByte(unCompInput: Array[Byte]): Array[Byte] = 
unCompInput
+  override def compressByte(compInput: ByteBuffer): ByteBuffer = compInput
+
+  override def compressByte(unCompInput: Array[Byte]): ByteBuffer = 
ByteBuffer.wrap(unCompInput)
 
   override def compressByte(unCompInput: Array[Byte], byteSize: Int): 
Array[Byte] = unCompInput
 
@@ -58,10 +60,10 @@ class CustomizeCompressor extends Compressor {
 
   override def unCompressByte(compInput: Array[Byte], offset: Int, length: 
Int): Array[Byte] = compInput
 
-  override def compressShort(unCompInput: Array[Short]): Array[Byte] = {
+  override def compressShort(unCompInput: Array[Short]): ByteBuffer = {
     val buffer = ByteBuffer.allocate(unCompInput.length * 
ByteUtil.SIZEOF_SHORT)
     buffer.asShortBuffer().put(unCompInput)
-    compressByte(buffer.array())
+    compressByte(buffer)
   }
 
   override def unCompressShort(compInput: Array[Byte], offset: Int, length: 
Int): Array[Short] = {
@@ -71,7 +73,7 @@ class CustomizeCompressor extends Compressor {
     res
   }
 
-  override def compressInt(unCompInput: Array[Int]): Array[Byte] = {
+  override def compressInt(unCompInput: Array[Int]): ByteBuffer = {
     val buffer = ByteBuffer.allocate(unCompInput.length * ByteUtil.SIZEOF_INT)
     buffer.asIntBuffer().put(unCompInput)
     compressByte(buffer.array())
@@ -84,7 +86,7 @@ class CustomizeCompressor extends Compressor {
     res
   }
 
-  override def compressLong(unCompInput: Array[Long]): Array[Byte] = {
+  override def compressLong(unCompInput: Array[Long]): ByteBuffer = {
     val buffer = ByteBuffer.allocate(unCompInput.length * ByteUtil.SIZEOF_LONG)
     buffer.asLongBuffer().put(unCompInput)
     compressByte(buffer.array())
@@ -97,7 +99,7 @@ class CustomizeCompressor extends Compressor {
     res
   }
 
-  override def compressFloat(unCompInput: Array[Float]): Array[Byte] = {
+  override def compressFloat(unCompInput: Array[Float]): ByteBuffer = {
     val buffer = ByteBuffer.allocate(unCompInput.length * 
ByteUtil.SIZEOF_FLOAT)
     buffer.asFloatBuffer().put(unCompInput)
     compressByte(buffer.array())
@@ -110,7 +112,7 @@ class CustomizeCompressor extends Compressor {
     res
   }
 
-  override def compressDouble(unCompInput: Array[Double]): Array[Byte] = {
+  override def compressDouble(unCompInput: Array[Double]): ByteBuffer = {
     val buffer = ByteBuffer.allocate(unCompInput.length * 
ByteUtil.SIZEOF_DOUBLE)
     buffer.asDoubleBuffer().put(unCompInput)
     compressByte(buffer.array())
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index a1d215a..f29a94a 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -347,8 +347,8 @@ class CGDataMapWriter(
     outStream.writeObject(maxMin)
     outStream.close()
     val bytes = compressor.compressByte(out.getBytes)
-    stream.write(bytes)
-    stream.writeInt(bytes.length)
+    stream.write(bytes.array(), 0, bytes.position())
+    stream.writeInt(bytes.position())
     stream.close()
   }
 
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index 0bfca7a..4d848a6 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -361,11 +361,11 @@ class FGDataMapWriter(carbonTable: CarbonTable,
     outStream.writeObject(blockletListUpdated)
     outStream.close()
     val bytes = compressor.compressByte(out.getBytes)
-    stream.write(bytes)
+    stream.write(bytes.array(), 0, bytes.limit())
     maxMin +=
     ((blockletId, (blockletListUpdated.head._1, blockletListUpdated.last
-      ._1), position, bytes.length))
-    position += bytes.length
+      ._1), position, bytes.limit()))
+    position += bytes.limit()
     blockletList.clear()
   }
 
@@ -428,8 +428,8 @@ class FGDataMapWriter(carbonTable: CarbonTable,
     outStream.writeObject(maxMin)
     outStream.close()
     val bytes = compressor.compressByte(out.getBytes)
-    stream.write(bytes)
-    stream.writeInt(bytes.length)
+    stream.write(bytes.array(), 0, bytes.limit())
+    stream.writeInt(bytes.limit())
     stream.close()
   }
 }
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/sortexpr/AllDataTypesTestCaseSort.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/sortexpr/AllDataTypesTestCaseSort.scala
index 14d90f8..d983220 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/sortexpr/AllDataTypesTestCaseSort.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/sortexpr/AllDataTypesTestCaseSort.scala
@@ -17,9 +17,15 @@
 
 package org.apache.carbondata.spark.testsuite.sortexpr
 
+import java.nio.ByteBuffer
+
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
 /**
  * Test Class for sort expression query on multiple datatypes
  */
@@ -37,6 +43,57 @@ class AllDataTypesTestCaseSort extends QueryTest with 
BeforeAndAfterAll {
 
   }
 
+  test("simple string column encoding test") {
+    sql("drop table if exists source")
+    sql("create table source (id string, score int) stored as carbondata " +
+        "tblproperties ('local_dictionary_enable'='true', 
'long_string_columns'='id')")
+    sql("insert into source values ('aaa', 123)")
+    sql("select * from source").show
+    checkAnswer(sql("select * from source"), Seq(Row("aaa", 123)))
+    sql("drop table source")
+
+    sql("drop table if exists source")
+    sql("create table source (id string, score int) stored as carbondata " +
+        "tblproperties ('local_dictionary_enable'='false', 
'long_string_columns'='id')")
+    sql("insert into source values ('aaa', 123)")
+    sql("select * from source").show
+    checkAnswer(sql("select * from source"), Seq(Row("aaa", 123)))
+    sql("drop table source")
+
+    sql("create table source (id string, score int) stored as carbondata " +
+        "tblproperties ('local_dictionary_enable'='true')")
+    sql("insert into source values ('aaa', 123)")
+    sql("select * from source").show
+    checkAnswer(sql("select * from source"), Seq(Row("aaa", 123)))
+    sql("drop table source")
+
+    sql("drop table if exists source")
+    sql("create table source (id string, score int) stored as carbondata " +
+        "tblproperties ('local_dictionary_enable'='false')")
+    sql("insert into source values ('aaa', 123)")
+    sql("select * from source").show
+    checkAnswer(sql("select * from source"), Seq(Row("aaa", 123)))
+    sql("drop table source")
+
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE,
 "false")
+    sql("drop table if exists source")
+    sql("create table source (id string, score int) stored as carbondata " +
+        "tblproperties ('local_dictionary_enable'='true')")
+    sql("insert into source values ('aaa', 123)")
+    sql("select * from source").show
+    checkAnswer(sql("select * from source"), Seq(Row("aaa", 123)))
+    sql("drop table source")
+
+    sql("create table source (id string, score int) stored as carbondata " +
+        "tblproperties ('local_dictionary_enable'='false')")
+    sql("insert into source values ('aaa', 123)")
+    sql("select * from source").show
+    checkAnswer(sql("select * from source"), Seq(Row("aaa", 123)))
+    sql("drop table source")
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE,
+      CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_DEFAULT)
+  }
+
   test("select empno,empname,utilization,count(salary),sum(empno) from 
alldatatypestablesort where empname in ('arvind','ayushi') group by 
empno,empname,utilization order by empno") {
     checkAnswer(
       sql("select empno,empname,utilization,count(salary),sum(empno) from 
alldatatypestablesort where empname in ('arvind','ayushi') group by 
empno,empname,utilization order by empno"),
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index aaa9a38..7489f1c 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -37,9 +37,8 @@ import 
org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory
 import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.EncodingFactory;
 import 
org.apache.carbondata.core.datastore.page.statistics.KeyPageStatsCollector;
-import 
org.apache.carbondata.core.datastore.page.statistics.LVLongStringStatsCollector;
-import 
org.apache.carbondata.core.datastore.page.statistics.LVShortStringStatsCollector;
 import 
org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector;
+import 
org.apache.carbondata.core.datastore.page.statistics.StringStatsCollector;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.datastore.row.ComplexColumnInfo;
 import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
@@ -148,7 +147,7 @@ public class TablePage {
         }
         // set the stats collector according to the data type of the columns
         if (DataTypes.VARCHAR == dataType || DataTypes.BINARY == dataType) {
-          page.setStatsCollector(LVLongStringStatsCollector.newInstance());
+          page.setStatsCollector(StringStatsCollector.newInstance());
         } else if (DataTypeUtil.isPrimitiveColumn(spec.getSchemaDataType())) {
           if (spec.getSchemaDataType() == DataTypes.TIMESTAMP) {
             
page.setStatsCollector(PrimitivePageStatsCollector.newInstance(DataTypes.LONG));
@@ -157,7 +156,7 @@ public class TablePage {
                 
PrimitivePageStatsCollector.newInstance(spec.getSchemaDataType()));
           }
         } else {
-          page.setStatsCollector(LVShortStringStatsCollector.newInstance());
+          page.setStatsCollector(StringStatsCollector.newInstance());
         }
         noDictDimensionPages[tmpNumNoDictDimIdx++] = page;
       }
@@ -225,8 +224,7 @@ public class TablePage {
       for (int i = 0; i < noDictAndComplex.length; i++) {
         if (noDictionaryDimensionSpec.get(i).getSchemaDataType() == 
DataTypes.VARCHAR
             || noDictionaryDimensionSpec.get(i).getSchemaDataType() == 
DataTypes.BINARY) {
-          byte[] valueWithLength = addIntLengthToByteArray((byte[]) 
noDictAndComplex[i]);
-          noDictDimensionPages[i].putData(rowId, valueWithLength);
+          noDictDimensionPages[i].putData(rowId, noDictAndComplex[i]);
         } else if (i < noDictionaryCount) {
           if (DataTypeUtil
               
.isPrimitiveColumn(noDictDimensionPages[i].getColumnSpec().getSchemaDataType()))
 {
@@ -240,10 +238,7 @@ public class TablePage {
             }
             noDictDimensionPages[i].putData(rowId, value);
           } else {
-            // noDictionary columns, since it is variable length, we need to 
prepare each
-            // element as LV result byte array (first two bytes are the length 
of the array)
-            byte[] valueWithLength = addShortLengthToByteArray((byte[]) 
noDictAndComplex[i]);
-            noDictDimensionPages[i].putData(rowId, valueWithLength);
+            noDictDimensionPages[i].putData(rowId, noDictAndComplex[i]);
           }
         } else {
           // complex columns
@@ -325,15 +320,6 @@ public class TablePage {
     return output;
   }
 
-  // Adds length as a integer element (first 4 bytes) to the head of the input 
byte array
-  private byte[] addIntLengthToByteArray(byte[] input) {
-    byte[] output = new byte[input.length + 4];
-    ByteBuffer buffer = ByteBuffer.wrap(output);
-    buffer.putInt(input.length);
-    buffer.put(input, 0, input.length);
-    return output;
-  }
-
   void encode() throws IOException {
     // encode dimensions and measure
     EncodedColumnPage[] dimensions = encodeAndCompressDimensions();
diff --git 
a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java 
b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index b714ec3..d2c1380 100644
--- 
a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ 
b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -900,9 +900,9 @@ public class CarbonWriterBuilder {
       options = new HashMap<>();
     }
     CarbonLoadModelBuilder builder = new CarbonLoadModelBuilder(table);
-    CarbonLoadModel build = builder.build(options, timestamp, taskNo);
-    setCsvHeader(build);
-    return build;
+    CarbonLoadModel model = builder.build(options, timestamp, taskNo);
+    setCsvHeader(model);
+    return model;
   }
 
   /* loop through all the parent column and

Reply via email to