http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalMaxMinLong.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalMaxMinLong.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalMaxMinLong.java
index 83976f0..91eefc2 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalMaxMinLong.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalMaxMinLong.java
@@ -24,11 +24,13 @@ import java.nio.ByteBuffer;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureChunkStoreFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastorage.store.compression.Compressor;
 import 
org.apache.carbondata.core.datastorage.store.compression.CompressorFactory;
 import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
-import 
org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 public class UnCompressNonDecimalMaxMinLong
     implements ValueCompressonHolder.UnCompressValue<long[]> {
@@ -41,12 +43,18 @@ public class UnCompressNonDecimalMaxMinLong
   /**
    * longCompressor.
    */
-  private static Compressor compressor = CompressorFactory.getInstance();
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
   /**
    * value.
    */
   private long[] value;
 
+  private MeasureDataChunkStore<long[]> measureChunkStore;
+
+  private BigDecimal maxValue;
+
+  private double divisionFactor;
+
   @Override public void setValue(long[] value) {
     this.value = value;
 
@@ -69,7 +77,8 @@ public class UnCompressNonDecimalMaxMinLong
   }
 
   @Override
-  public ValueCompressonHolder.UnCompressValue 
uncompress(ValueCompressionUtil.DataType dataType) {
+  public ValueCompressonHolder.UnCompressValue uncompress(DataType dataType, 
byte[] compressData,
+      int offset, int length, int decimalPlaces, Object maxValueObject) {
     return null;
   }
 
@@ -89,24 +98,33 @@ public class UnCompressNonDecimalMaxMinLong
     return new UnCompressNonDecimalMaxMinByte();
   }
 
-  @Override public CarbonReadDataHolder getValues(int decimal, Object 
maxValueObject) {
-    double maxValue = (double) maxValueObject;
-    double[] vals = new double[value.length];
-    CarbonReadDataHolder carbonDataHolder = new CarbonReadDataHolder();
-    for (int i = 0; i < vals.length; i++) {
-      vals[i] = value[i] / Math.pow(10, decimal);
-
-      if (value[i] == 0) {
-        vals[i] = maxValue;
-      } else {
-        BigDecimal diff = BigDecimal.valueOf(value[i] / Math.pow(10, decimal));
-        BigDecimal max = BigDecimal.valueOf(maxValue);
-        vals[i] = max.subtract(diff).doubleValue();
-      }
+  @Override public long getLongValue(int index) {
+    throw new UnsupportedOperationException("Get long value is not supported");
+  }
 
+  @Override public double getDoubleValue(int index) {
+    long longValue = measureChunkStore.getLong(index);
+    if (longValue == 0) {
+      return maxValue.doubleValue();
+    } else {
+      BigDecimal diff = BigDecimal.valueOf(longValue / this.divisionFactor);
+      return maxValue.subtract(diff).doubleValue();
     }
-    carbonDataHolder.setReadableDoubleValues(vals);
-    return carbonDataHolder;
   }
 
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException("Get big decimal value is not 
supported");
+  }
+
+  @Override public void setUncompressValues(long[] data, int decimalPlaces, 
Object maxValueObject) {
+    this.measureChunkStore =
+        
MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_LONG, 
data.length);
+    this.measureChunkStore.putData(data);
+    this.maxValue = BigDecimal.valueOf((double) maxValueObject);
+    this.divisionFactor = Math.pow(10, decimalPlaces);
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalMaxMinShort.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalMaxMinShort.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalMaxMinShort.java
index 60476bd..8f1edad 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalMaxMinShort.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalMaxMinShort.java
@@ -24,11 +24,13 @@ import java.nio.ByteBuffer;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureChunkStoreFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastorage.store.compression.Compressor;
 import 
org.apache.carbondata.core.datastorage.store.compression.CompressorFactory;
 import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
-import 
org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 public class UnCompressNonDecimalMaxMinShort
     implements ValueCompressonHolder.UnCompressValue<short[]> {
@@ -40,12 +42,18 @@ public class UnCompressNonDecimalMaxMinShort
   /**
    * compressor.
    */
-  private static Compressor compressor = CompressorFactory.getInstance();
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
   /**
    * value.
    */
   private short[] value;
 
+  private MeasureDataChunkStore<short[]> measureChunkStore;
+
+  private BigDecimal maxValue;
+
+  private double divisionFactor;
+
   @Override public void setValue(short[] value) {
     this.value = value;
 
@@ -66,8 +74,9 @@ public class UnCompressNonDecimalMaxMinShort
     return byte1;
   }
 
-  @Override public ValueCompressonHolder.UnCompressValue uncompress(
-      ValueCompressionUtil.DataType dataTypeVal) {
+  @Override
+  public ValueCompressonHolder.UnCompressValue uncompress(DataType dataType, 
byte[] compressData,
+      int offset, int length, int decimalPlaces, Object maxValueObject) {
     return null;
   }
 
@@ -87,24 +96,34 @@ public class UnCompressNonDecimalMaxMinShort
     return new UnCompressNonDecimalMaxMinByte();
   }
 
-  @Override public CarbonReadDataHolder getValues(int decimal, Object 
maxValueObject) {
-    double maxValue = (double) maxValueObject;
-    double[] vals = new double[value.length];
-    CarbonReadDataHolder dataHolder = new CarbonReadDataHolder();
-    for (int i = 0; i < vals.length; i++) {
-      vals[i] = value[i] / Math.pow(10, decimal);
-
-      if (value[i] == 0) {
-        vals[i] = maxValue;
-      } else {
-        BigDecimal diff = BigDecimal.valueOf(value[i] / Math.pow(10, decimal));
-        BigDecimal max = BigDecimal.valueOf(maxValue);
-        vals[i] = max.subtract(diff).doubleValue();
-      }
+  @Override public long getLongValue(int index) {
+    throw new UnsupportedOperationException("Get long value is not supported");
+  }
 
+  @Override public double getDoubleValue(int index) {
+    short shortValue = measureChunkStore.getShort(index);
+    if (shortValue == 0) {
+      return maxValue.doubleValue();
+    } else {
+      BigDecimal diff = BigDecimal.valueOf(shortValue / this.divisionFactor);
+      return maxValue.subtract(diff).doubleValue();
     }
-    dataHolder.setReadableDoubleValues(vals);
-    return dataHolder;
   }
 
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException("Get big decimal value is not 
supported");
+  }
+
+  @Override
+  public void setUncompressValues(short[] data, int decimalPlaces, Object 
maxValueObject) {
+    this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
+        .getMeasureDataChunkStore(DataType.DATA_SHORT, data.length);
+    this.measureChunkStore.putData(data);
+    this.maxValue = BigDecimal.valueOf((double) maxValueObject);
+    this.divisionFactor = Math.pow(10, decimalPlaces);
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalShort.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalShort.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalShort.java
index acf7dd8..dfae4f2 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalShort.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalShort.java
@@ -19,15 +19,18 @@
 
 package org.apache.carbondata.core.datastorage.store.compression.nondecimal;
 
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureChunkStoreFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastorage.store.compression.Compressor;
 import 
org.apache.carbondata.core.datastorage.store.compression.CompressorFactory;
 import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
-import 
org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 public class UnCompressNonDecimalShort implements 
ValueCompressonHolder.UnCompressValue<short[]> {
   /**
@@ -38,12 +41,16 @@ public class UnCompressNonDecimalShort implements 
ValueCompressonHolder.UnCompre
   /**
    * shortCompressor.
    */
-  private static Compressor compressor = CompressorFactory.getInstance();
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
   /**
    * value.
    */
   private short[] value;
 
+  private MeasureDataChunkStore<short[]> measureChunkStore;
+
+  private double divisionFactory;
+
   @Override public void setValue(short[] value) {
     this.value = value;
 
@@ -69,7 +76,8 @@ public class UnCompressNonDecimalShort implements 
ValueCompressonHolder.UnCompre
   }
 
   @Override
-  public ValueCompressonHolder.UnCompressValue 
uncompress(ValueCompressionUtil.DataType dataType) {
+  public ValueCompressonHolder.UnCompressValue uncompress(DataType dataType, 
byte[] compressData,
+      int offset, int length, int decimalPlaces, Object maxValueObject) {
     return null;
   }
 
@@ -85,14 +93,27 @@ public class UnCompressNonDecimalShort implements 
ValueCompressonHolder.UnCompre
     return new UnCompressNonDecimalByte();
   }
 
-  @Override public CarbonReadDataHolder getValues(int decimal, Object 
maxValueObject) {
-    CarbonReadDataHolder dataHolder = new CarbonReadDataHolder();
-    double[] vals = new double[value.length];
-    for (int i = 0; i < vals.length; i++) {
-      vals[i] = value[i] / Math.pow(10, decimal);
-    }
-    dataHolder.setReadableDoubleValues(vals);
-    return dataHolder;
+  @Override public long getLongValue(int index) {
+    throw new UnsupportedOperationException("Get long value is not supported");
   }
 
+  @Override public double getDoubleValue(int index) {
+    return (measureChunkStore.getShort(index) / this.divisionFactory);
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException("Get big decimal value is not 
supported");
+  }
+
+  @Override
+  public void setUncompressValues(short[] data, int decimalPlaces, Object 
maxValueObject) {
+    this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
+        .getMeasureDataChunkStore(DataType.DATA_SHORT, data.length);
+    this.measureChunkStore.putData(data);
+    this.divisionFactory = Math.pow(10, decimalPlaces);
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/none/UnCompressNoneByte.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/none/UnCompressNoneByte.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/none/UnCompressNoneByte.java
index 1a30f73..4449241 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/none/UnCompressNoneByte.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/none/UnCompressNoneByte.java
@@ -19,13 +19,16 @@
 
 package org.apache.carbondata.core.datastorage.store.compression.none;
 
+import java.math.BigDecimal;
+
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureChunkStoreFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastorage.store.compression.Compressor;
 import 
org.apache.carbondata.core.datastorage.store.compression.CompressorFactory;
 import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
 import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder.UnCompressValue;
-import 
org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
 import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
@@ -39,7 +42,7 @@ public class UnCompressNoneByte implements 
UnCompressValue<byte[]> {
   /**
    * byteCompressor.
    */
-  private static Compressor compressor = CompressorFactory.getInstance();
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
 
   /**
    * value.
@@ -51,6 +54,8 @@ public class UnCompressNoneByte implements 
UnCompressValue<byte[]> {
    */
   private DataType actualDataType;
 
+  private MeasureDataChunkStore<byte[]> measureChunkStore;
+
   public UnCompressNoneByte(DataType actualDataType) {
     this.actualDataType = actualDataType;
   }
@@ -68,9 +73,12 @@ public class UnCompressNoneByte implements 
UnCompressValue<byte[]> {
     this.value = value;
   }
 
-  @Override public UnCompressValue uncompress(DataType dataType) {
+  @Override
+  public UnCompressValue uncompress(DataType dataType, byte[] data, int 
offset, int length,
+      int mantissa, Object maxValueObject) {
     UnCompressValue byte1 = ValueCompressionUtil.getUnCompressNone(dataType, 
actualDataType);
-    ValueCompressonHolder.unCompress(dataType, byte1, value);
+    ValueCompressonHolder
+        .unCompress(dataType, byte1, data, offset, length, mantissa, 
maxValueObject);
     return byte1;
   }
 
@@ -95,34 +103,25 @@ public class UnCompressNoneByte implements 
UnCompressValue<byte[]> {
     return new UnCompressNoneByte(actualDataType);
   }
 
-  @Override public CarbonReadDataHolder getValues(int decimal, Object 
maxValueObject) {
-    switch (actualDataType) {
-      case DATA_BIGINT:
-        return unCompressLong();
-      default:
-        return unCompressDouble();
-    }
+  @Override public long getLongValue(int index) {
+    return measureChunkStore.getByte(index);
+  }
 
+  @Override public double getDoubleValue(int index) {
+    return measureChunkStore.getByte(index);
   }
 
-  private CarbonReadDataHolder unCompressDouble() {
-    CarbonReadDataHolder dataHldr = new CarbonReadDataHolder();
-    double[] vals = new double[value.length];
-    for (int i = 0; i < vals.length; i++) {
-      vals[i] = value[i];
-    }
-    dataHldr.setReadableDoubleValues(vals);
-    return dataHldr;
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException("Get big decimal is not 
supported");
   }
 
-  private CarbonReadDataHolder unCompressLong() {
-    CarbonReadDataHolder dataHldr = new CarbonReadDataHolder();
-    long[] vals = new long[value.length];
-    for (int i = 0; i < vals.length; i++) {
-      vals[i] = value[i];
-    }
-    dataHldr.setReadableLongValues(vals);
-    return dataHldr;
+  @Override public void setUncompressValues(byte[] data, int decimalPlaces, 
Object maxValueObject) {
+    this.measureChunkStore =
+        
MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_BYTE, 
data.length);
+    this.measureChunkStore.putData(data);
   }
 
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/none/UnCompressNoneDefault.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/none/UnCompressNoneDefault.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/none/UnCompressNoneDefault.java
index 1c7f651..8789ecf 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/none/UnCompressNoneDefault.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/none/UnCompressNoneDefault.java
@@ -19,14 +19,16 @@
 
 package org.apache.carbondata.core.datastorage.store.compression.none;
 
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureChunkStoreFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastorage.store.compression.Compressor;
 import 
org.apache.carbondata.core.datastorage.store.compression.CompressorFactory;
 import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder.UnCompressValue;
-import 
org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
 import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
@@ -39,7 +41,7 @@ public class UnCompressNoneDefault implements 
UnCompressValue<double[]> {
   /**
    * doubleCompressor.
    */
-  private static Compressor compressor = CompressorFactory.getInstance();
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
   /**
    * value.
    */
@@ -47,6 +49,8 @@ public class UnCompressNoneDefault implements 
UnCompressValue<double[]> {
 
   private DataType actualDataType;
 
+  private MeasureDataChunkStore<double[]> measureChunkStore;
+
   public UnCompressNoneDefault(DataType actualDataType) {
     this.actualDataType = actualDataType;
   }
@@ -71,7 +75,9 @@ public class UnCompressNoneDefault implements 
UnCompressValue<double[]> {
     return byte1;
   }
 
-  @Override public UnCompressValue uncompress(DataType dataType) {
+  @Override
+  public UnCompressValue uncompress(DataType dataType, byte[] data, int 
offset, int length,
+      int decimalPlaces, Object maxValueObject) {
     return null;
   }
 
@@ -88,10 +94,27 @@ public class UnCompressNoneDefault implements 
UnCompressValue<double[]> {
     this.value = ValueCompressionUtil.convertToDoubleArray(buffer, 
value.length);
   }
 
-  @Override public CarbonReadDataHolder getValues(int decimal, Object 
maxValueObject) {
-    CarbonReadDataHolder dataHolder = new CarbonReadDataHolder();
-    dataHolder.setReadableDoubleValues(value);
-    return dataHolder;
+  @Override public long getLongValue(int index) {
+    throw new UnsupportedOperationException("Get long value is not supported");
+  }
+
+  @Override public double getDoubleValue(int index) {
+    return measureChunkStore.getDouble(index);
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException("Get big decimal is not 
supported");
   }
 
+  @Override
+  public void setUncompressValues(double[] data, int decimalPlaces, Object 
maxValueObject) {
+    this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
+        .getMeasureDataChunkStore(DataType.DATA_DOUBLE, data.length);
+    this.measureChunkStore.putData(data);
+
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/none/UnCompressNoneFloat.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/none/UnCompressNoneFloat.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/none/UnCompressNoneFloat.java
deleted file mode 100644
index 01c9422..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/none/UnCompressNoneFloat.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.core.datastorage.store.compression.none;
-
-import java.nio.ByteBuffer;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastorage.store.compression.Compressor;
-import 
org.apache.carbondata.core.datastorage.store.compression.CompressorFactory;
-import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
-import 
org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
-import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
-
-public class UnCompressNoneFloat implements 
ValueCompressonHolder.UnCompressValue<float[]> {
-  /**
-   * Attribute for Carbon LOGGER
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(UnCompressNoneFloat.class.getName());
-  /**
-   * compressor
-   */
-  private static Compressor compressor = CompressorFactory.getInstance();
-  /**
-   * value.
-   */
-  private float[] value;
-
-  private DataType actualDataType;
-
-  public UnCompressNoneFloat(DataType actualDataType) {
-    this.actualDataType = actualDataType;
-  }
-
-  @Override public void setValue(float[] value) {
-    this.value = value;
-
-  }
-
-  @Override public ValueCompressonHolder.UnCompressValue getNew() {
-    try {
-      return (ValueCompressonHolder.UnCompressValue) clone();
-    } catch (CloneNotSupportedException ex5) {
-      LOGGER.error(ex5, ex5.getMessage());
-    }
-    return null;
-  }
-
-  @Override public ValueCompressonHolder.UnCompressValue compress() {
-    UnCompressNoneByte byte1 = new UnCompressNoneByte(actualDataType);
-    byte1.setValue(compressor.compressFloat(value));
-    return byte1;
-  }
-
-  @Override public void setValueInBytes(byte[] value) {
-    ByteBuffer buffer = ByteBuffer.wrap(value);
-    this.value = ValueCompressionUtil.convertToFloatArray(buffer, 
value.length);
-  }
-
-  /**
-   * @see ValueCompressonHolder.UnCompressValue#getCompressorObject()
-   */
-  @Override public ValueCompressonHolder.UnCompressValue getCompressorObject() 
{
-    return new UnCompressNoneByte(this.actualDataType);
-  }
-
-  @Override public CarbonReadDataHolder getValues(int decimal, Object 
maxValueObject) {
-    CarbonReadDataHolder dataHolder = new CarbonReadDataHolder();
-    double[] val = new double[value.length];
-    for (int i = 0; i < val.length; i++) {
-      val[i] = value[i];
-    }
-    dataHolder.setReadableDoubleValues(val);
-    return dataHolder;
-  }
-
-  @Override
-  public ValueCompressonHolder.UnCompressValue 
uncompress(ValueCompressionUtil.DataType dataType) {
-    return null;
-  }
-
-  @Override public byte[] getBackArrayData() {
-    return ValueCompressionUtil.convertToBytes(value);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/none/UnCompressNoneInt.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/none/UnCompressNoneInt.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/none/UnCompressNoneInt.java
index 4edd330..c4b7205 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/none/UnCompressNoneInt.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/none/UnCompressNoneInt.java
@@ -19,14 +19,17 @@
 
 package org.apache.carbondata.core.datastorage.store.compression.none;
 
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureChunkStoreFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastorage.store.compression.Compressor;
 import 
org.apache.carbondata.core.datastorage.store.compression.CompressorFactory;
 import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
-import 
org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
+import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder.UnCompressValue;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
 import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
@@ -39,7 +42,7 @@ public class UnCompressNoneInt implements 
ValueCompressonHolder.UnCompressValue<
   /**
    * intCompressor.
    */
-  private static Compressor compressor = CompressorFactory.getInstance();
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
   /**
    * value.
    */
@@ -47,6 +50,8 @@ public class UnCompressNoneInt implements 
ValueCompressonHolder.UnCompressValue<
 
   private DataType actualDataType;
 
+  private MeasureDataChunkStore<int[]> measureChunkStore;
+
   public UnCompressNoneInt(DataType actualDataType) {
     this.actualDataType = actualDataType;
   }
@@ -74,7 +79,9 @@ public class UnCompressNoneInt implements 
ValueCompressonHolder.UnCompressValue<
     return byte1;
   }
 
-  @Override public ValueCompressonHolder.UnCompressValue uncompress(DataType 
dataType) {
+  @Override
+  public UnCompressValue uncompress(DataType dataType, byte[] data, int 
offset, int length,
+      int decimalPlaces, Object maxValueObject) {
     return null;
   }
 
@@ -90,54 +97,26 @@ public class UnCompressNoneInt implements 
ValueCompressonHolder.UnCompressValue<
     return new UnCompressNoneByte(this.actualDataType);
   }
 
-  @Override public CarbonReadDataHolder getValues(int decimal, Object 
maxValueObject) {
-    switch (actualDataType) {
-      case DATA_SHORT:
-        return unCompressShort();
-      case DATA_INT:
-        return unCompressInt();
-      case DATA_LONG:
-      case DATA_BIGINT:
-        return unCompressLong();
-      default:
-        return unCompressDouble();
-    }
+  @Override public long getLongValue(int index) {
+    return measureChunkStore.getInt(index);
   }
 
-  private CarbonReadDataHolder unCompressShort() {
-    CarbonReadDataHolder dataHolder = new CarbonReadDataHolder();
-    short[] vals = new short[value.length];
-    for (int i = 0; i < vals.length; i++) {
-      vals[i] = (short)value[i];
-    }
-    dataHolder.setReadableShortValues(vals);
-    return dataHolder;
+  @Override public double getDoubleValue(int index) {
+    return measureChunkStore.getInt(index);
   }
 
-  private CarbonReadDataHolder unCompressInt() {
-    CarbonReadDataHolder dataHolder = new CarbonReadDataHolder();
-    dataHolder.setReadableIntValues(value);
-    return dataHolder;
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException("Get big decimal is not 
supported");
   }
 
-  private CarbonReadDataHolder unCompressDouble() {
-    CarbonReadDataHolder dataHolderInfoObj = new CarbonReadDataHolder();
-    double[] vals = new double[value.length];
-    for (int i = 0; i < vals.length; i++) {
-      vals[i] = value[i];
-    }
-    dataHolderInfoObj.setReadableDoubleValues(vals);
-    return dataHolderInfoObj;
-  }
+  @Override public void setUncompressValues(int[] data, int decimalPlaces, 
Object maxValueObject) {
+    this.measureChunkStore =
+        
MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_INT, 
data.length);
+    this.measureChunkStore.putData(data);
 
-  private CarbonReadDataHolder unCompressLong() {
-    CarbonReadDataHolder dataHolderInfoObj = new CarbonReadDataHolder();
-    long[] vals = new long[value.length];
-    for (int i = 0; i < vals.length; i++) {
-      vals[i] = value[i];
-    }
+  }
 
-    dataHolderInfoObj.setReadableLongValues(vals);
-    return dataHolderInfoObj;
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/none/UnCompressNoneLong.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/none/UnCompressNoneLong.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/none/UnCompressNoneLong.java
index 364d2fa..4b45b13 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/none/UnCompressNoneLong.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/none/UnCompressNoneLong.java
@@ -19,14 +19,17 @@
 
 package org.apache.carbondata.core.datastorage.store.compression.none;
 
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureChunkStoreFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastorage.store.compression.Compressor;
 import 
org.apache.carbondata.core.datastorage.store.compression.CompressorFactory;
 import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
-import 
org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
+import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder.UnCompressValue;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
 import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
@@ -39,7 +42,7 @@ public class UnCompressNoneLong implements 
ValueCompressonHolder.UnCompressValue
   /**
    * longCompressor.
    */
-  private static Compressor compressor = CompressorFactory.getInstance();
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
   /**
    * value.
    */
@@ -47,6 +50,8 @@ public class UnCompressNoneLong implements 
ValueCompressonHolder.UnCompressValue
 
   private DataType actualDataType;
 
+  private MeasureDataChunkStore<long[]> measureChunkStore;
+
   public UnCompressNoneLong(DataType actualDataType) {
     this.actualDataType = actualDataType;
   }
@@ -60,8 +65,7 @@ public class UnCompressNoneLong implements 
ValueCompressonHolder.UnCompressValue
     try {
       return (ValueCompressonHolder.UnCompressValue) clone();
     } catch (CloneNotSupportedException clnNotSupportedExc) {
-      LOGGER.error(clnNotSupportedExc,
-          clnNotSupportedExc.getMessage());
+      LOGGER.error(clnNotSupportedExc, clnNotSupportedExc.getMessage());
     }
     return null;
   }
@@ -74,7 +78,8 @@ public class UnCompressNoneLong implements 
ValueCompressonHolder.UnCompressValue
   }
 
   @Override
-  public ValueCompressonHolder.UnCompressValue 
uncompress(ValueCompressionUtil.DataType dType) {
+  public UnCompressValue uncompress(DataType dataType, byte[] data, int 
offset, int length,
+      int decimalPlaces, Object maxValueObject) {
     return null;
   }
 
@@ -94,32 +99,26 @@ public class UnCompressNoneLong implements 
ValueCompressonHolder.UnCompressValue
     return new UnCompressNoneByte(this.actualDataType);
   }
 
-  @Override public CarbonReadDataHolder getValues(int decimal, Object 
maxValueObject) {
-    switch (actualDataType) {
-      case DATA_BIGINT:
-        return unCompressLong();
-      default:
-        return unCompressDouble();
-    }
+  @Override public long getLongValue(int index) {
+    return measureChunkStore.getLong(index);
+  }
+
+  @Override public double getDoubleValue(int index) {
+    return measureChunkStore.getLong(index);
   }
 
-  private CarbonReadDataHolder unCompressDouble() {
-    CarbonReadDataHolder dataHldr = new CarbonReadDataHolder();
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException("Get big decimal is not 
supported");
+  }
 
-    double[] vals = new double[value.length];
+  @Override public void setUncompressValues(long[] data, int decimalPlaces, 
Object maxValueObject) {
+    this.measureChunkStore =
+        
MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_LONG, 
data.length);
+    this.measureChunkStore.putData(data);
 
-    for (int i = 0; i < vals.length; i++) {
-      vals[i] = value[i];
-    }
-    dataHldr.setReadableDoubleValues(vals);
-    return dataHldr;
   }
 
-  private CarbonReadDataHolder unCompressLong() {
-    CarbonReadDataHolder dataHldr = new CarbonReadDataHolder();
-    long[] vals = new long[value.length];
-    System.arraycopy(value, 0, vals, 0, vals.length);
-    dataHldr.setReadableLongValues(vals);
-    return dataHldr;
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/none/UnCompressNoneShort.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/none/UnCompressNoneShort.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/none/UnCompressNoneShort.java
index 7b8762d..8eb4ee9 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/none/UnCompressNoneShort.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/none/UnCompressNoneShort.java
@@ -19,14 +19,17 @@
 
 package org.apache.carbondata.core.datastorage.store.compression.none;
 
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureChunkStoreFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastorage.store.compression.Compressor;
 import 
org.apache.carbondata.core.datastorage.store.compression.CompressorFactory;
 import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
-import 
org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
+import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder.UnCompressValue;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
 import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
@@ -40,13 +43,15 @@ public class UnCompressNoneShort implements 
ValueCompressonHolder.UnCompressValu
   /**
    * shortCompressor.
    */
-  private static Compressor compressor = CompressorFactory.getInstance();
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
 
   /**
    * value.
    */
   private short[] shortValue;
 
+  private MeasureDataChunkStore<short[]> measureChunkStore;
+
   private DataType actualDataType;
 
   public UnCompressNoneShort(DataType actualDataType) {
@@ -72,7 +77,9 @@ public class UnCompressNoneShort implements 
ValueCompressonHolder.UnCompressValu
     return byte1;
   }
 
-  @Override public ValueCompressonHolder.UnCompressValue uncompress(DataType 
dataType) {
+  @Override
+  public UnCompressValue uncompress(DataType dataType, byte[] data, int 
offset, int length,
+      int decimalPlaces, Object maxValueObject) {
     return null;
   }
 
@@ -92,36 +99,27 @@ public class UnCompressNoneShort implements 
ValueCompressonHolder.UnCompressValu
     return new UnCompressNoneByte(this.actualDataType);
   }
 
-  @Override public CarbonReadDataHolder getValues(int decimal, Object 
maxValueObject) {
-    switch (actualDataType) {
-      case DATA_BIGINT:
-        return unCompressLong();
-      default:
-        return unCompressDouble();
-    }
+  @Override public long getLongValue(int index) {
+    return measureChunkStore.getShort(index);
   }
 
-  private CarbonReadDataHolder unCompressDouble() {
-    CarbonReadDataHolder dataHldr = new CarbonReadDataHolder();
-
-    double[] vals = new double[shortValue.length];
+  @Override public double getDoubleValue(int index) {
+    return measureChunkStore.getShort(index);
+  }
 
-    for (int i = 0; i < vals.length; i++) {
-      vals[i] = shortValue[i];
-    }
-    dataHldr.setReadableDoubleValues(vals);
-    return dataHldr;
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException("Get big decimal is not 
supported");
   }
 
-  private CarbonReadDataHolder unCompressLong() {
-    CarbonReadDataHolder dataHldr = new CarbonReadDataHolder();
+  @Override
+  public void setUncompressValues(short[] data, int decimalPlaces, Object 
maxValueObject) {
+    this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
+        .getMeasureDataChunkStore(DataType.DATA_SHORT, data.length);
+    this.measureChunkStore.putData(data);
 
-    long[] vals = new long[shortValue.length];
+  }
 
-    for (int i = 0; i < vals.length; i++) {
-      vals[i] = shortValue[i];
-    }
-    dataHldr.setReadableLongValues(vals);
-    return dataHldr;
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/type/UnCompressBigDecimal.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/type/UnCompressBigDecimal.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/type/UnCompressBigDecimal.java
index bf2c11b..e33f8a4 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/type/UnCompressBigDecimal.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/type/UnCompressBigDecimal.java
@@ -18,11 +18,11 @@
  */
 package org.apache.carbondata.core.datastorage.store.compression.type;
 
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder.UnCompressValue;
-import 
org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
 import org.apache.carbondata.core.util.BigDecimalCompressionFinder;
 import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
@@ -50,47 +50,41 @@ public class UnCompressBigDecimal<T> implements 
UnCompressValue<T> {
     this.rightPart = rightPart;
   }
 
-  @Override
-  public void setValue(T value) {
+  @Override public void setValue(T value) {
     Object[] values = (Object[]) value;
     leftPart.setValue(values[0]);
     rightPart.setValue(values[1]);
   }
 
-  @Override
-  public void setValueInBytes(byte[] value) {
+  @Override public void setValueInBytes(byte[] value) {
     // TODO Auto-generated method stub
 
   }
 
-  @Override
-  public UnCompressValue<T> getNew() {
+  @Override public UnCompressValue<T> getNew() {
     UnCompressValue leftUnCompressClone = leftPart.getNew();
     UnCompressValue rightUnCompressClone = rightPart.getNew();
-    return new UnCompressBigDecimal(compressionFinder, leftUnCompressClone,
-        rightUnCompressClone);
+    return new UnCompressBigDecimal(compressionFinder, leftUnCompressClone, 
rightUnCompressClone);
   }
 
-  @Override
-  public UnCompressValue compress() {
-    UnCompressBigDecimal byt = new UnCompressBigDecimal<>(compressionFinder,
-        leftPart.compress(), rightPart.compress());
+  @Override public UnCompressValue compress() {
+    UnCompressBigDecimal byt =
+        new UnCompressBigDecimal<>(compressionFinder, leftPart.compress(), 
rightPart.compress());
     return byt;
   }
 
   @Override
-  public UnCompressValue uncompress(DataType dataType) {
+  public UnCompressValue uncompress(DataType dataType, byte[] data, int 
offset, int length,
+      int decimalPlaces, Object maxValueObject) {
     // TODO Auto-generated method stub
     return null;
   }
 
-  @Override
-  public byte[] getBackArrayData() {
+  @Override public byte[] getBackArrayData() {
     byte[] leftdata = leftPart.getBackArrayData();
     byte[] rightdata = rightPart.getBackArrayData();
     ByteBuffer byteBuffer = ByteBuffer
-        .allocate(CarbonCommonConstants.INT_SIZE_IN_BYTE + leftdata.length
-            + rightdata.length);
+        .allocate(CarbonCommonConstants.INT_SIZE_IN_BYTE + leftdata.length + 
rightdata.length);
     byteBuffer.putInt(leftdata.length);
     byteBuffer.put(leftdata);
     byteBuffer.put(rightdata);
@@ -98,15 +92,28 @@ public class UnCompressBigDecimal<T> implements 
UnCompressValue<T> {
     return byteBuffer.array();
   }
 
-  @Override
-  public UnCompressValue getCompressorObject() {
-    return new UnCompressBigDecimalByte<>(compressionFinder,
-        leftPart.getCompressorObject(), rightPart.getCompressorObject());
+  @Override public UnCompressValue getCompressorObject() {
+    return new UnCompressBigDecimalByte<>(compressionFinder, 
leftPart.getCompressorObject(),
+        rightPart.getCompressorObject(), 0, null);
   }
 
-  @Override
-  public CarbonReadDataHolder getValues(int decimal, Object maxValue) {
+  @Override public void setUncompressValues(T data, int decimalPlaces, Object 
maxValueObject) {
     // TODO Auto-generated method stub
-    return null;
+
+  }
+
+  @Override public long getLongValue(int index) {
+    throw new UnsupportedOperationException("Get long is not supported");
+  }
+
+  @Override public double getDoubleValue(int index) {
+    throw new UnsupportedOperationException("Get double is not supported");
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException("Get big decimal is not 
supported");
+  }
+
+  @Override public void freeMemory() {
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/type/UnCompressBigDecimalByte.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/type/UnCompressBigDecimalByte.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/type/UnCompressBigDecimalByte.java
index 98990d0..32203c5 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/type/UnCompressBigDecimalByte.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/type/UnCompressBigDecimalByte.java
@@ -24,7 +24,6 @@ import java.nio.ByteBuffer;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
 import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder.UnCompressValue;
-import 
org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
 import org.apache.carbondata.core.util.BigDecimalCompressionFinder;
 import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
@@ -39,22 +38,28 @@ public class UnCompressBigDecimalByte<T> implements 
UnCompressValue<T> {
 
   private UnCompressValue rightPart;
 
-  public UnCompressBigDecimalByte(
-      BigDecimalCompressionFinder compressionFinder, UnCompressValue leftPart,
-      UnCompressValue rightPart) {
+  private double divisionFactor;
+
+  private boolean isDecimalPlacesNotZero;
+
+  public UnCompressBigDecimalByte(BigDecimalCompressionFinder 
compressionFinder,
+      UnCompressValue leftPart, UnCompressValue rightPart, int decimalPalacs,
+      Object maxValueObject) {
     this.compressionFinder = compressionFinder;
     this.leftPart = leftPart;
     this.rightPart = rightPart;
+    if (decimalPalacs > 0) {
+      this.isDecimalPlacesNotZero = true;
+    }
+    this.divisionFactor = Math.pow(10, decimalPalacs);
   }
 
-  @Override
-  public void setValue(T value) {
+  @Override public void setValue(T value) {
     byte[] values = (byte[]) value;
     ByteBuffer buffer = ByteBuffer.wrap(values);
     buffer.rewind();
     int leftPartLen = buffer.getInt();
-    int rightPartLen = values.length - leftPartLen
-        - CarbonCommonConstants.INT_SIZE_IN_BYTE;
+    int rightPartLen = values.length - leftPartLen - 
CarbonCommonConstants.INT_SIZE_IN_BYTE;
     byte[] leftValue = new byte[leftPartLen];
     byte[] rightValue = new byte[rightPartLen];
     buffer.get(leftValue);
@@ -63,72 +68,78 @@ public class UnCompressBigDecimalByte<T> implements 
UnCompressValue<T> {
     rightPart.setValue(rightValue);
   }
 
-  @Override
-  public void setValueInBytes(byte[] value) {
+  @Override public void setValueInBytes(byte[] value) {
     // TODO Auto-generated method stub
 
   }
 
-  @Override
-  public UnCompressValue<T> getNew() {
+  @Override public UnCompressValue<T> getNew() {
     UnCompressValue leftUnCompressClone = leftPart.getNew();
     UnCompressValue rightUnCompressClone = rightPart.getNew();
-    return new UnCompressBigDecimal(compressionFinder, leftUnCompressClone,
-        rightUnCompressClone);
+    return new UnCompressBigDecimal(compressionFinder, leftUnCompressClone, 
rightUnCompressClone);
   }
 
-  @Override
-  public UnCompressValue compress() {
-    UnCompressBigDecimal byt = new UnCompressBigDecimal<>(compressionFinder,
-        leftPart.compress(), rightPart.compress());
+  @Override public UnCompressValue compress() {
+    UnCompressBigDecimal byt =
+        new UnCompressBigDecimal<>(compressionFinder, leftPart.compress(), 
rightPart.compress());
     return byt;
   }
 
   @Override
-  public UnCompressValue uncompress(DataType dataType) {
+  public UnCompressValue uncompress(DataType dataType, byte[] data, int 
offset, int length,
+      int decimalPlaces, Object maxValueObject) {
+    ByteBuffer buffer = ByteBuffer.wrap(data, offset, length);
+    int leftPathLength = buffer.getInt();
+    int rightPartLength = length - leftPathLength - 
CarbonCommonConstants.INT_SIZE_IN_BYTE;
+    Long[] maxValue = (Long[]) maxValueObject;
     ValueCompressonHolder.UnCompressValue left = leftPart
-        .uncompress(compressionFinder.getLeftConvertedDataType());
+        .uncompress(compressionFinder.getLeftConvertedDataType(), data,
+            offset + CarbonCommonConstants.INT_SIZE_IN_BYTE, leftPathLength, 
decimalPlaces,
+            maxValue[0]);
     ValueCompressonHolder.UnCompressValue right = rightPart
-        .uncompress(compressionFinder.getRightConvertedDataType());
-    return new UnCompressBigDecimalByte<>(compressionFinder, left, right);
+        .uncompress(compressionFinder.getRightConvertedDataType(), data,
+            offset + CarbonCommonConstants.INT_SIZE_IN_BYTE + leftPathLength, 
rightPartLength,
+            decimalPlaces, maxValue[1]);
+    return new UnCompressBigDecimalByte<>(compressionFinder, left, right, 
decimalPlaces,
+        maxValueObject);
   }
 
-  @Override
-  public byte[] getBackArrayData() {
+  @Override public byte[] getBackArrayData() {
     return null;
   }
 
-  @Override
-  public UnCompressValue getCompressorObject() {
-    return new UnCompressBigDecimal<>(compressionFinder,
-        leftPart.getCompressorObject(), rightPart.getCompressorObject());
+  @Override public UnCompressValue getCompressorObject() {
+    return new UnCompressBigDecimal<>(compressionFinder, 
leftPart.getCompressorObject(),
+        rightPart.getCompressorObject());
   }
 
-  @Override
-  public CarbonReadDataHolder getValues(int decimal, Object maxValue) {
-    Long[] maxValues = (Long[]) maxValue;
-    CarbonReadDataHolder dataHolder = new CarbonReadDataHolder();
-    CarbonReadDataHolder leftDataHolder = leftPart.getValues(decimal,
-        maxValues[0]);
-    long[] leftVals = leftDataHolder.getReadableLongValue();
-    int size = leftVals.length;
-    long[] rightVals = new long[size];
-    if (decimal > 0) {
-      CarbonReadDataHolder rightDataHolder = rightPart.getValues(decimal,
-          maxValues[1]);
-      rightVals = rightDataHolder.getReadableLongValue();
-    }
-    BigDecimal[] values = new BigDecimal[size];
-    for (int i = 0; i < size; i++) {
-      String decimalPart = Double.toString(rightVals[i]/Math.pow(10, decimal));
-      String bigdStr = Long.toString(leftVals[i])
-          + CarbonCommonConstants.POINT
-          + decimalPart.substring(decimalPart.indexOf(".")+1, 
decimalPart.length());
-      BigDecimal bigdVal = new BigDecimal(bigdStr);
-      values[i] = bigdVal;
+  @Override public void setUncompressValues(T data, int decimalPlaces, Object 
maxValueObject) {
+    //. do nothing
+  }
+
+  @Override public long getLongValue(int index) {
+    throw new UnsupportedOperationException("Get long is not supported");
+  }
+
+  @Override public double getDoubleValue(int index) {
+    throw new UnsupportedOperationException("Get double is not supported");
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    long leftValue = leftPart.getLongValue(index);
+    long rightValue = 0;
+    if (isDecimalPlacesNotZero) {
+      rightValue = rightPart.getLongValue(index);
     }
-    dataHolder.setReadableBigDecimalValues(values);
-    return dataHolder;
+    String decimalPart = Double.toString(rightValue / this.divisionFactor);
+    String bigdStr = Long.toString(leftValue) + CarbonCommonConstants.POINT + 
decimalPart
+        .substring(decimalPart.indexOf(".") + 1, decimalPart.length());
+    return new BigDecimal(bigdStr);
+  }
+
+  @Override public void freeMemory() {
+    leftPart.freeMemory();
+    rightPart.freeMemory();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/datastorage/store/dataholder/CarbonReadDataHolder.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/dataholder/CarbonReadDataHolder.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/dataholder/CarbonReadDataHolder.java
index 7bf3dbe..8dd6724 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/dataholder/CarbonReadDataHolder.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/dataholder/CarbonReadDataHolder.java
@@ -21,70 +21,30 @@ package 
org.apache.carbondata.core.datastorage.store.dataholder;
 
 import java.math.BigDecimal;
 
+import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
+
 // This class is used with Uncompressor to hold the decompressed column chunk 
in memory
 public class CarbonReadDataHolder {
 
-  private byte[][] byteValues;
-
-  private short[] shortValues;
-
-  private int[] intValues;
-
-  private long[] longValues;
-
-  private BigDecimal[] bigDecimalValues;
-
-  private double[] doubleValues;
-
-  public void setReadableByteValues(byte[][] byteValues) {
-    this.byteValues = byteValues;
-  }
-
-  public void setReadableShortValues(short[] shortValues) {
-    this.shortValues = shortValues;
-  }
-
-  public void setReadableIntValues(int[] intValues) {
-    this.intValues = intValues;
-  }
-
-  public void setReadableLongValues(long[] longValues) {
-    this.longValues = longValues;
-  }
-
-  public void setReadableBigDecimalValues(BigDecimal[] bigDecimalValues) {
-    this.bigDecimalValues = bigDecimalValues;
-  }
-
-  public void setReadableDoubleValues(double[] doubleValues) {
-    this.doubleValues = doubleValues;
-  }
-
-  public byte[] getReadableByteArrayValueByIndex(int index) {
-    return this.byteValues[index];
-  }
-
-  public long getReadableShortValueByIndex(int index) {
-    return this.shortValues[index];
-  }
+  private ValueCompressonHolder.UnCompressValue unCompressValue;
 
-  public long getReadableIntValueByIndex(int index) {
-    return this.intValues[index];
+  public CarbonReadDataHolder(ValueCompressonHolder.UnCompressValue 
unCompressValue) {
+    this.unCompressValue = unCompressValue;
   }
 
-  public long[] getReadableLongValue() {
-    return this.longValues;
-  }
   public long getReadableLongValueByIndex(int index) {
-    return this.longValues[index];
+    return this.unCompressValue.getLongValue(index);
   }
 
   public BigDecimal getReadableBigDecimalValueByIndex(int index) {
-    return this.bigDecimalValues[index];
+    return this.unCompressValue.getBigDecimalValue(index);
   }
 
   public double getReadableDoubleValueByIndex(int index) {
-    return this.doubleValues[index];
+    return this.unCompressValue.getDoubleValue(index);
   }
 
+  public void freeMemory() {
+    unCompressValue.freeMemory();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/HeavyCompressedDoubleArrayDataInMemoryStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/HeavyCompressedDoubleArrayDataInMemoryStore.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/HeavyCompressedDoubleArrayDataInMemoryStore.java
index 5b9e28b..0c29143 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/HeavyCompressedDoubleArrayDataInMemoryStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/HeavyCompressedDoubleArrayDataInMemoryStore.java
@@ -27,5 +27,4 @@ public class HeavyCompressedDoubleArrayDataInMemoryStore
   public HeavyCompressedDoubleArrayDataInMemoryStore(WriterCompressModel 
compressionModel) {
     super(compressionModel);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/keygenerator/mdkey/NumberCompressor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/keygenerator/mdkey/NumberCompressor.java
 
b/core/src/main/java/org/apache/carbondata/core/keygenerator/mdkey/NumberCompressor.java
index fa61f38..96fbb08 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/keygenerator/mdkey/NumberCompressor.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/keygenerator/mdkey/NumberCompressor.java
@@ -139,15 +139,15 @@ public class NumberCompressor {
     return words;
   }
 
-  public int[] unCompress(byte[] key) {
-    int ls = key.length;
+  public int[] unCompress(byte[] key, int offset, int length) {
+    int ls = length;
     int arrayLength = (ls * BYTE_LENGTH) / bitsLength;
     long[] words = new long[getWordsSizeFromBytesSize(ls)];
-    unCompressVal(key, ls, words);
+    unCompressVal(key, ls, words, offset);
     return getArray(words, arrayLength);
   }
 
-  private void unCompressVal(byte[] key, int ls, long[] words) {
+  private void unCompressVal(byte[] key, int ls, long[] words, int offset) {
     for (int i = 0; i < words.length; i++) {
       long l = 0;
       ls -= BYTE_LENGTH;
@@ -160,7 +160,7 @@ public class NumberCompressor {
       }
       for (int j = ls; j < m; j++) {
         l <<= BYTE_LENGTH;
-        l ^= key[j] & 0xFF;
+        l ^= key[offset+j] & 0xFF;
       }
       words[i] = l;
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/memory/MemoryAllocatorFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/memory/MemoryAllocatorFactory.java
 
b/core/src/main/java/org/apache/carbondata/core/memory/MemoryAllocatorFactory.java
new file mode 100644
index 0000000..554e35d
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/memory/MemoryAllocatorFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.memory;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+/**
+ * Factory class to to get the memory allocator instance
+ */
+public class MemoryAllocatorFactory {
+
+  private MemoryAllocator memoryAllocator;
+
+  public static final MemoryAllocatorFactory INSATANCE = new 
MemoryAllocatorFactory();
+
+  private MemoryAllocatorFactory() {
+    boolean offHeap = Boolean.parseBoolean(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.USE_OFFHEAP_IN_QUERY_PROCSSING,
+            CarbonCommonConstants.USE_OFFHEAP_IN_QUERY_PROCSSING_DEFAULT));
+    if (offHeap) {
+      memoryAllocator = MemoryAllocator.UNSAFE;
+    } else {
+      memoryAllocator = MemoryAllocator.HEAP;
+    }
+  }
+
+  public MemoryAllocator getMemoryAllocator() {
+    return memoryAllocator;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/unsafe/CarbonUnsafe.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/unsafe/CarbonUnsafe.java 
b/core/src/main/java/org/apache/carbondata/core/unsafe/CarbonUnsafe.java
index 2e96123..49a9dc4 100644
--- a/core/src/main/java/org/apache/carbondata/core/unsafe/CarbonUnsafe.java
+++ b/core/src/main/java/org/apache/carbondata/core/unsafe/CarbonUnsafe.java
@@ -25,8 +25,14 @@ public final class CarbonUnsafe {
 
   public static final int BYTE_ARRAY_OFFSET;
 
+  public static final int SHORT_ARRAY_OFFSET;
+
+  public static final int INT_ARRAY_OFFSET;
+
   public static final int LONG_ARRAY_OFFSET;
 
+  public static final int DOUBLE_ARRAY_OFFSET;
+
   public static Unsafe unsafe;
 
   static {
@@ -39,10 +45,16 @@ public final class CarbonUnsafe {
     }
     if (unsafe != null) {
       BYTE_ARRAY_OFFSET = unsafe.arrayBaseOffset(byte[].class);
+      SHORT_ARRAY_OFFSET = unsafe.arrayBaseOffset(short[].class);
+      INT_ARRAY_OFFSET = unsafe.arrayBaseOffset(int[].class);
       LONG_ARRAY_OFFSET = unsafe.arrayBaseOffset(long[].class);
+      DOUBLE_ARRAY_OFFSET = unsafe.arrayBaseOffset(double[].class);
     } else {
       BYTE_ARRAY_OFFSET = 0;
+      SHORT_ARRAY_OFFSET = 0;
+      INT_ARRAY_OFFSET = 0;
       LONG_ARRAY_OFFSET = 0;
+      DOUBLE_ARRAY_OFFSET = 0;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
index 602c8ae..98eaedf 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
@@ -574,7 +574,7 @@ public class CarbonMetadataUtil {
       //meta
       PresenceMeta presenceMeta = new PresenceMeta();
       presenceMeta.setPresent_bit_streamIsSet(true);
-      presenceMeta.setPresent_bit_stream(CompressorFactory.getInstance()
+      
presenceMeta.setPresent_bit_stream(CompressorFactory.getInstance().getCompressor()
           
.compressByte(blockletInfoColumnar.getMeasureNullValueIndex()[i].toByteArray()));
       dataChunk.setPresence(presenceMeta);
       //TODO : PresenceMeta needs to be implemented and set here

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 5196631..e61d1d2 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -48,6 +48,8 @@ import 
org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentif
 import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex;
 import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
 import 
org.apache.carbondata.core.carbon.datastore.chunk.impl.FixedLengthDimensionDataChunk;
 import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
@@ -331,7 +333,7 @@ public final class CarbonUtil {
 
   public static String getBadLogPath(String storeLocation) {
     String badLogStoreLocation =
-            
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
+        
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
     badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
 
     return badLogStoreLocation;
@@ -395,27 +397,21 @@ public final class CarbonUtil {
     int cmpResult = 0;
     while (high >= low) {
       int mid = (low + high) / 2;
-      cmpResult = ByteUtil.UnsafeComparer.INSTANCE
-          .compareTo(dimColumnDataChunk.getCompleteDataChunk(), mid * 
compareValue.length,
-              compareValue.length, compareValue, 0, compareValue.length);
+      cmpResult = dimColumnDataChunk.compareTo(mid, compareValue);
       if (cmpResult < 0) {
         low = mid + 1;
       } else if (cmpResult > 0) {
         high = mid - 1;
       } else {
         int currentIndex = mid;
-        if(!matchUpLimit) {
-          while (currentIndex - 1 >= 0 && ByteUtil.UnsafeComparer.INSTANCE
-              .compareTo(dimColumnDataChunk.getCompleteDataChunk(),
-                  (currentIndex - 1) * compareValue.length, 
compareValue.length, compareValue, 0,
-                  compareValue.length) == 0) {
+        if (!matchUpLimit) {
+          while (currentIndex - 1 >= 0
+              && dimColumnDataChunk.compareTo(currentIndex - 1, compareValue) 
== 0) {
             --currentIndex;
           }
         } else {
-          while (currentIndex + 1 <= high && ByteUtil.UnsafeComparer.INSTANCE
-              .compareTo(dimColumnDataChunk.getCompleteDataChunk(),
-                  (currentIndex + 1) * compareValue.length, 
compareValue.length, compareValue, 0,
-                  compareValue.length) == 0) {
+          while (currentIndex + 1 <= high
+              && dimColumnDataChunk.compareTo(currentIndex + 1, compareValue) 
== 0) {
             currentIndex++;
           }
         }
@@ -436,10 +432,8 @@ public final class CarbonUtil {
    */
   public static int nextLesserValueToTarget(int currentIndex,
       FixedLengthDimensionDataChunk dimColumnDataChunk, byte[] compareValue) {
-    while (currentIndex - 1 >= 0 && ByteUtil.UnsafeComparer.INSTANCE
-        .compareTo(dimColumnDataChunk.getCompleteDataChunk(),
-            (currentIndex - 1) * compareValue.length, compareValue.length, 
compareValue, 0,
-            compareValue.length) >= 0) {
+    while (currentIndex - 1 >= 0
+        && dimColumnDataChunk.compareTo(currentIndex - 1, compareValue) >= 0) {
       --currentIndex;
     }
 
@@ -458,10 +452,8 @@ public final class CarbonUtil {
    */
   public static int nextGreaterValueToTarget(int currentIndex,
       FixedLengthDimensionDataChunk dimColumnDataChunk, byte[] compareValue, 
int numerOfRows) {
-    while (currentIndex + 1 < numerOfRows && ByteUtil.UnsafeComparer.INSTANCE
-        .compareTo(dimColumnDataChunk.getCompleteDataChunk(),
-            (currentIndex + 1) * compareValue.length, compareValue.length, 
compareValue, 0,
-            compareValue.length) <= 0) {
+    while (currentIndex + 1 < numerOfRows
+        && dimColumnDataChunk.compareTo(currentIndex + 1, compareValue) <= 0) {
       ++currentIndex;
     }
 
@@ -469,17 +461,17 @@ public final class CarbonUtil {
   }
 
   public static int[] getUnCompressColumnIndex(int totalLength, byte[] 
columnIndexData,
-      NumberCompressor numberCompressor) {
-    ByteBuffer buffer = ByteBuffer.wrap(columnIndexData);
-    buffer.rewind();
+      NumberCompressor numberCompressor, int offset) {
+    ByteBuffer buffer = ByteBuffer.wrap(columnIndexData, offset, totalLength);
     int indexDataLength = buffer.getInt();
     byte[] indexData = new byte[indexDataLength];
     byte[] indexMap =
         new byte[totalLength - indexDataLength - 
CarbonCommonConstants.INT_SIZE_IN_BYTE];
     buffer.get(indexData);
     buffer.get(indexMap);
-    return 
UnBlockIndexer.uncompressIndex(numberCompressor.unCompress(indexData),
-        numberCompressor.unCompress(indexMap));
+    return UnBlockIndexer
+        .uncompressIndex(numberCompressor.unCompress(indexData, 0, 
indexData.length),
+            numberCompressor.unCompress(indexMap, 0, indexMap.length));
   }
 
   /**
@@ -503,8 +495,7 @@ public final class CarbonUtil {
    * @return
    * @throws IOException
    */
-  public static int[] getCardinalityFromLevelMetadataFile(String levelPath)
-      throws IOException {
+  public static int[] getCardinalityFromLevelMetadataFile(String levelPath) 
throws IOException {
     DataInputStream dataInputStream = null;
     int[] cardinality = null;
 
@@ -528,9 +519,9 @@ public final class CarbonUtil {
 
   public static void writeLevelCardinalityFile(String loadFolderLoc, String 
tableName,
       int[] dimCardinality) throws KettleException {
-    String levelCardinalityFilePath = loadFolderLoc + File.separator +
-        CarbonCommonConstants.LEVEL_METADATA_FILE + tableName
-        + CarbonCommonConstants.CARBON_METADATA_EXTENSION;
+    String levelCardinalityFilePath =
+        loadFolderLoc + File.separator + 
CarbonCommonConstants.LEVEL_METADATA_FILE + tableName
+            + CarbonCommonConstants.CARBON_METADATA_EXTENSION;
     FileOutputStream fileOutputStream = null;
     FileChannel channel = null;
     try {
@@ -588,7 +579,7 @@ public final class CarbonUtil {
       case ":":
       case "^":
       case "\\":
-      case"$":
+      case "$":
       case "+":
       case "?":
       case "(":
@@ -610,9 +601,9 @@ public final class CarbonUtil {
    */
   public static String checkAndAppendHDFSUrl(String filePath) {
     String currentPath = filePath;
-    if (null != filePath && filePath.length() != 0 &&
-        FileFactory.getFileType(filePath) != FileFactory.FileType.HDFS &&
-        FileFactory.getFileType(filePath) != FileFactory.FileType.VIEWFS) {
+    if (null != filePath && filePath.length() != 0
+        && FileFactory.getFileType(filePath) != FileFactory.FileType.HDFS
+        && FileFactory.getFileType(filePath) != FileFactory.FileType.VIEWFS) {
       String baseDFSUrl = CarbonProperties.getInstance()
           .getProperty(CarbonCommonConstants.CARBON_DDL_BASE_HDFS_URL);
       if (null != baseDFSUrl) {
@@ -775,7 +766,7 @@ public final class CarbonUtil {
    * Below method will be used to check whether particular encoding is present
    * in the dimension or not
    *
-   * @param encoding  encoding to search
+   * @param encoding encoding to search
    * @return if encoding is present in dimension
    */
   public static boolean hasEncoding(List<Encoding> encodings, Encoding 
encoding) {
@@ -801,12 +792,12 @@ public final class CarbonUtil {
   /**
    * below method is to check whether it is complex data type
    *
-   * @param dataType  data type to be searched
+   * @param dataType data type to be searched
    * @return if data type is present
    */
   public static boolean hasComplexDataType(DataType dataType) {
     switch (dataType) {
-      case ARRAY :
+      case ARRAY:
       case STRUCT:
       case MAP:
         return true;
@@ -836,8 +827,7 @@ public final class CarbonUtil {
   public static boolean[] getImplicitColumnArray(QueryDimension[] 
queryDimensions) {
     boolean[] implicitColumnArray = new boolean[queryDimensions.length];
     for (int i = 0; i < queryDimensions.length; i++) {
-      implicitColumnArray[i] = queryDimensions[i]
-          .getDimension().hasEncoding(Encoding.IMPLICIT);
+      implicitColumnArray[i] = 
queryDimensions[i].getDimension().hasEncoding(Encoding.IMPLICIT);
     }
     return implicitColumnArray;
   }
@@ -854,8 +844,7 @@ public final class CarbonUtil {
   /**
    * Below method will be used to read the data file matadata
    */
-  public static DataFileFooter readMetadatFile(TableBlockInfo tableBlockInfo)
-      throws IOException {
+  public static DataFileFooter readMetadatFile(TableBlockInfo tableBlockInfo) 
throws IOException {
     AbstractDataFileFooterConverter fileFooterConverter =
         DataFileFooterConverterFactory.getInstance()
             .getDataFileFooterConverter(tableBlockInfo.getVersion());
@@ -864,6 +853,7 @@ public final class CarbonUtil {
 
   /**
    * The method calculate the B-Tree metadata size.
+   *
    * @param tableBlockInfo
    * @return
    */
@@ -887,7 +877,6 @@ public final class CarbonUtil {
     }
   }
 
-
   /**
    * Below method will be used to get the surrogate key
    *
@@ -959,7 +948,7 @@ public final class CarbonUtil {
     for (CarbonDimension carbonDimension : tableDimensionList) {
       List<CarbonDimension> childs = 
carbonDimension.getListOfChildDimensions();
       //assuming complex dimensions will always be atlast
-      if(null != childs && childs.size() > 0) {
+      if (null != childs && childs.size() > 0) {
         break;
       }
       if (carbonDimension.isColumnar() && 
hasEncoding(carbonDimension.getEncoder(),
@@ -1107,6 +1096,7 @@ public final class CarbonUtil {
       }
     }
   }
+
   /**
    * Below method will be used to get all the block index info from index file
    *
@@ -1169,7 +1159,7 @@ public final class CarbonUtil {
       fileReader =
           FileFactory.getDataInputStream(csvFilePath, 
FileFactory.getFileType(csvFilePath));
       bufferedReader = new BufferedReader(new InputStreamReader(fileReader,
-              Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+          Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
       readLine = bufferedReader.readLine();
     } finally {
       CarbonUtil.closeStreams(fileReader, bufferedReader);
@@ -1183,8 +1173,7 @@ public final class CarbonUtil {
    * @param a
    * @param num
    */
-  public static String printLine(String a, int num)
-  {
+  public static String printLine(String a, int num) {
     StringBuilder builder = new StringBuilder();
     for (int i = 0; i < num; i++) {
       builder.append(a);
@@ -1238,12 +1227,13 @@ public final class CarbonUtil {
    * @param dataChunkBytes datachunk thrift object in bytes
    * @return data chunk thrift object
    */
-  public static DataChunk2 readDataChunk(byte[] dataChunkBytes) throws 
IOException {
+  public static DataChunk2 readDataChunk(byte[] dataChunkBytes, int offset, 
int length)
+      throws IOException {
     return (DataChunk2) read(dataChunkBytes, new ThriftReader.TBaseCreator() {
-        @Override public TBase create() {
-          return new DataChunk2();
-        }
-      });
+      @Override public TBase create() {
+        return new DataChunk2();
+      }
+    }, offset, length);
   }
 
   /**
@@ -1255,8 +1245,9 @@ public final class CarbonUtil {
    * @return thrift object
    * @throws IOException any problem while converting the object
    */
-  private static TBase read(byte[] data, TBaseCreator creator) throws 
IOException {
-    ByteArrayInputStream stream = new ByteArrayInputStream(data);
+  private static TBase read(byte[] data, TBaseCreator creator, int offset, int 
length)
+      throws IOException {
+    ByteArrayInputStream stream = new ByteArrayInputStream(data, offset, 
length);
     TProtocol binaryIn = new TCompactProtocol(new TIOStreamTransport(stream));
     TBase t = creator.create();
     try {
@@ -1365,6 +1356,23 @@ public final class CarbonUtil {
     return outputArray;
   }
 
+  public static void freeMemory(DimensionColumnDataChunk[] 
dimensionColumnDataChunk,
+      MeasureColumnDataChunk[] measureColumnDataChunks) {
+    if (null != measureColumnDataChunks) {
+      for (int i = 0; i < measureColumnDataChunks.length; i++) {
+        if (null != measureColumnDataChunks[i]) {
+          measureColumnDataChunks[i].freeMemory();
+        }
+      }
+    }
+    if (null != dimensionColumnDataChunk) {
+      for (int i = 0; i < dimensionColumnDataChunk.length; i++) {
+        if (null != dimensionColumnDataChunk[i]) {
+          dimensionColumnDataChunk[i].freeMemory();
+        }
+      }
+    }
+  }
 
   /**
    * This method will check if dictionary and its metadata file exists for a 
given column
@@ -1374,31 +1382,27 @@ public final class CarbonUtil {
    * @return
    */
   public static boolean isFileExistsForGivenColumn(String carbonStorePath,
-          DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier) {
+      DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier) {
     PathService pathService = CarbonCommonFactory.getPathService();
     CarbonTablePath carbonTablePath = 
pathService.getCarbonTablePath(carbonStorePath,
-            dictionaryColumnUniqueIdentifier.getCarbonTableIdentifier());
-
-    String dictionaryFilePath =
-            
carbonTablePath.getDictionaryFilePath(dictionaryColumnUniqueIdentifier
-                    .getColumnIdentifier().getColumnId());
-    String dictionaryMetadataFilePath =
-            
carbonTablePath.getDictionaryMetaFilePath(dictionaryColumnUniqueIdentifier
-                    .getColumnIdentifier().getColumnId());
+        dictionaryColumnUniqueIdentifier.getCarbonTableIdentifier());
+
+    String dictionaryFilePath = carbonTablePath.getDictionaryFilePath(
+        dictionaryColumnUniqueIdentifier.getColumnIdentifier().getColumnId());
+    String dictionaryMetadataFilePath = 
carbonTablePath.getDictionaryMetaFilePath(
+        dictionaryColumnUniqueIdentifier.getColumnIdentifier().getColumnId());
     // check if both dictionary and its metadata file exists for a given column
     return isFileExists(dictionaryFilePath) && 
isFileExists(dictionaryMetadataFilePath);
   }
 
   /**
-   *
    * @param tableInfo
    * @param invalidBlockVOForSegmentId
    * @param updateStatusMngr
    * @return
    */
   public static boolean isInvalidTableBlock(TableBlockInfo tableInfo,
-      UpdateVO invalidBlockVOForSegmentId,
-      SegmentUpdateStatusManager updateStatusMngr) {
+      UpdateVO invalidBlockVOForSegmentId, SegmentUpdateStatusManager 
updateStatusMngr) {
 
     if (!updateStatusMngr.isBlockValid(tableInfo.getSegmentId(),
         CarbonTablePath.getCarbonDataFileName(tableInfo.getFilePath()) + 
CarbonTablePath

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/util/ValueCompressionUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/ValueCompressionUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/ValueCompressionUtil.java
index c6a2c3a..081ed9f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/util/ValueCompressionUtil.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/ValueCompressionUtil.java
@@ -32,28 +32,23 @@ import 
org.apache.carbondata.core.datastorage.store.compression.ReaderCompressMo
 import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
 import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder.UnCompressValue;
 import 
org.apache.carbondata.core.datastorage.store.compression.WriterCompressModel;
-import 
org.apache.carbondata.core.datastorage.store.compression.decimal.UnCompressByteArray;
 import 
org.apache.carbondata.core.datastorage.store.compression.decimal.UnCompressMaxMinByte;
 import 
org.apache.carbondata.core.datastorage.store.compression.decimal.UnCompressMaxMinDefault;
-import 
org.apache.carbondata.core.datastorage.store.compression.decimal.UnCompressMaxMinFloat;
 import 
org.apache.carbondata.core.datastorage.store.compression.decimal.UnCompressMaxMinInt;
 import 
org.apache.carbondata.core.datastorage.store.compression.decimal.UnCompressMaxMinLong;
 import 
org.apache.carbondata.core.datastorage.store.compression.decimal.UnCompressMaxMinShort;
 import 
org.apache.carbondata.core.datastorage.store.compression.nondecimal.UnCompressNonDecimalByte;
 import 
org.apache.carbondata.core.datastorage.store.compression.nondecimal.UnCompressNonDecimalDefault;
-import 
org.apache.carbondata.core.datastorage.store.compression.nondecimal.UnCompressNonDecimalFloat;
 import 
org.apache.carbondata.core.datastorage.store.compression.nondecimal.UnCompressNonDecimalInt;
 import 
org.apache.carbondata.core.datastorage.store.compression.nondecimal.UnCompressNonDecimalLong;
 import 
org.apache.carbondata.core.datastorage.store.compression.nondecimal.UnCompressNonDecimalMaxMinByte;
 import 
org.apache.carbondata.core.datastorage.store.compression.nondecimal.UnCompressNonDecimalMaxMinDefault;
-import 
org.apache.carbondata.core.datastorage.store.compression.nondecimal.UnCompressNonDecimalMaxMinFloat;
 import 
org.apache.carbondata.core.datastorage.store.compression.nondecimal.UnCompressNonDecimalMaxMinInt;
 import 
org.apache.carbondata.core.datastorage.store.compression.nondecimal.UnCompressNonDecimalMaxMinLong;
 import 
org.apache.carbondata.core.datastorage.store.compression.nondecimal.UnCompressNonDecimalMaxMinShort;
 import 
org.apache.carbondata.core.datastorage.store.compression.nondecimal.UnCompressNonDecimalShort;
 import 
org.apache.carbondata.core.datastorage.store.compression.none.UnCompressNoneByte;
 import 
org.apache.carbondata.core.datastorage.store.compression.none.UnCompressNoneDefault;
-import 
org.apache.carbondata.core.datastorage.store.compression.none.UnCompressNoneFloat;
 import 
org.apache.carbondata.core.datastorage.store.compression.none.UnCompressNoneInt;
 import 
org.apache.carbondata.core.datastorage.store.compression.none.UnCompressNoneLong;
 import 
org.apache.carbondata.core.datastorage.store.compression.none.UnCompressNoneShort;
@@ -337,8 +332,6 @@ public final class ValueCompressionUtil {
         return getUnCompressNonDecimalMaxMin(changedDataType);
       case BIGINT:
         return getUnCompressNonDecimal(changedDataType);
-      case BIGDECIMAL:
-        return new 
UnCompressByteArray(UnCompressByteArray.ByteArrayType.BIG_DECIMAL);
       default:
         throw new IllegalArgumentException("unsupported compType: " + 
compType);
     }
@@ -633,8 +626,6 @@ public final class ValueCompressionUtil {
         return new UnCompressNoneInt(actualDataType);
       case DATA_LONG:
         return new UnCompressNoneLong(actualDataType);
-      case DATA_FLOAT:
-        return new UnCompressNoneFloat(actualDataType);
       default:
         return new UnCompressNoneDefault(actualDataType);
     }
@@ -654,8 +645,6 @@ public final class ValueCompressionUtil {
         return new UnCompressMaxMinInt(actualDataType);
       case DATA_LONG:
         return new UnCompressMaxMinLong(actualDataType);
-      case DATA_FLOAT:
-        return new UnCompressMaxMinFloat(actualDataType);
       default:
         return new UnCompressMaxMinDefault(actualDataType);
     }
@@ -675,8 +664,6 @@ public final class ValueCompressionUtil {
         return new UnCompressNonDecimalInt();
       case DATA_LONG:
         return new UnCompressNonDecimalLong();
-      case DATA_FLOAT:
-        return new UnCompressNonDecimalFloat();
       default:
         return new UnCompressNonDecimalDefault();
     }
@@ -696,8 +683,6 @@ public final class ValueCompressionUtil {
         return new UnCompressNonDecimalMaxMinInt();
       case DATA_LONG:
         return new UnCompressNonDecimalMaxMinLong();
-      case DATA_FLOAT:
-        return new UnCompressNonDecimalMaxMinFloat();
       default:
         return new UnCompressNonDecimalMaxMinDefault();
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/scan/collector/impl/AbstractScannedResultCollector.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/scan/collector/impl/AbstractScannedResultCollector.java
 
b/core/src/main/java/org/apache/carbondata/scan/collector/impl/AbstractScannedResultCollector.java
index 7304b3c..e48d469 100644
--- 
a/core/src/main/java/org/apache/carbondata/scan/collector/impl/AbstractScannedResultCollector.java
+++ 
b/core/src/main/java/org/apache/carbondata/scan/collector/impl/AbstractScannedResultCollector.java
@@ -106,10 +106,10 @@ public abstract class AbstractScannedResultCollector 
implements ScannedResultCol
         case LONG:
           return 
dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index);
         case DECIMAL:
-          return  org.apache.spark.sql.types.Decimal.apply(
-                  
dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(index));
+          return org.apache.spark.sql.types.Decimal
+              
.apply(dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(index));
         default:
-          return  
dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index);
+          return 
dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index);
       }
     }
     return null;
@@ -136,7 +136,7 @@ public abstract class AbstractScannedResultCollector 
implements ScannedResultCol
       ByteArrayWrapper key = null;
       for (int i = 0; i < listBasedResult.size(); i++) {
         // get the key
-        key = (ByteArrayWrapper)listBasedResult.get(i)[0];
+        key = (ByteArrayWrapper) listBasedResult.get(i)[0];
         // unpack the key with table block key generator
         data = tableBlockExecutionInfos.getBlockKeyGenerator()
             .getKeyArray(key.getDictionaryKey(), 
tableBlockExecutionInfos.getMaskedByteForBlock());

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/scan/complextypes/ComplexQueryType.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/scan/complextypes/ComplexQueryType.java
 
b/core/src/main/java/org/apache/carbondata/scan/complextypes/ComplexQueryType.java
index d2c6703..3296017 100644
--- 
a/core/src/main/java/org/apache/carbondata/scan/complextypes/ComplexQueryType.java
+++ 
b/core/src/main/java/org/apache/carbondata/scan/complextypes/ComplexQueryType.java
@@ -1,4 +1,3 @@
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -49,17 +48,8 @@ public class ComplexQueryType {
    */
   protected void copyBlockDataChunk(DimensionColumnDataChunk[] 
dimensionColumnDataChunks,
       int rowNumber, byte[] input) {
-    byte[] data = (byte[]) 
dimensionColumnDataChunks[blockIndex].getCompleteDataChunk();
-    if (null != 
dimensionColumnDataChunks[blockIndex].getAttributes().getInvertedIndexes()) {
-      System.arraycopy(data, 
dimensionColumnDataChunks[blockIndex].getAttributes()
-              .getInvertedIndexesReverse()[rowNumber] * 
dimensionColumnDataChunks[blockIndex]
-              .getAttributes().getColumnValueSize(), input, 0,
-          
dimensionColumnDataChunks[blockIndex].getAttributes().getColumnValueSize());
-    } else {
-      System.arraycopy(data,
-          rowNumber * 
dimensionColumnDataChunks[blockIndex].getAttributes().getColumnValueSize(),
-          input, 0, 
dimensionColumnDataChunks[blockIndex].getAttributes().getColumnValueSize());
-    }
+    byte[] data = 
dimensionColumnDataChunks[blockIndex].getChunkData(rowNumber);
+    System.arraycopy(data, 0, input, 0, data.length);
   }
 
   /*

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/scan/complextypes/PrimitiveQueryType.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/scan/complextypes/PrimitiveQueryType.java
 
b/core/src/main/java/org/apache/carbondata/scan/complextypes/PrimitiveQueryType.java
index 979a61b..ac717a9 100644
--- 
a/core/src/main/java/org/apache/carbondata/scan/complextypes/PrimitiveQueryType.java
+++ 
b/core/src/main/java/org/apache/carbondata/scan/complextypes/PrimitiveQueryType.java
@@ -91,7 +91,7 @@ public class PrimitiveQueryType extends ComplexQueryType 
implements GenericQuery
       DimensionColumnDataChunk[] dimensionDataChunks, int rowNumber,
       DataOutputStream dataOutputStream) throws IOException {
     byte[] currentVal =
-        new 
byte[dimensionDataChunks[blockIndex].getAttributes().getColumnValueSize()];
+        new byte[dimensionDataChunks[blockIndex].getColumnValueSize()];
     copyBlockDataChunk(dimensionDataChunks, rowNumber, currentVal);
     dataOutputStream.write(currentVal);
   }


Reply via email to