add file format version enum

Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/0ef3fb81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/0ef3fb81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/0ef3fb81

Branch: refs/heads/master
Commit: 0ef3fb81883e782fffef0beae01a429684893960
Parents: 90bc366
Author: jackylk <jacky.li...@huawei.com>
Authored: Thu Dec 1 23:02:16 2016 +0800
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Thu Dec 1 22:04:26 2016 +0530

----------------------------------------------------------------------
 conf/dataload.properties.template               |   4 +-
 .../core/carbon/ColumnarFormatVersion.java      |  50 +++
 .../carbon/datastore/block/TableBlockInfo.java  |  11 +-
 .../chunk/reader/CarbonDataReaderFactory.java   |  17 +-
 .../metadata/blocklet/DataFileFooter.java       |   7 +-
 .../core/constants/CarbonCommonConstants.java   |   2 +-
 .../util/AbstractDataFileFooterConverter.java   |   4 +-
 .../core/util/CarbonMetadataUtil.java           |  11 +-
 .../carbondata/core/util/CarbonProperties.java  |  49 ++-
 .../apache/carbondata/core/util/CarbonUtil.java |   5 -
 .../core/util/DataFileFooterConverter.java      |   3 +-
 .../core/util/DataFileFooterConverter2.java     |   3 +-
 .../util/DataFileFooterConverterFactory.java    |  14 +-
 .../carbon/datastore/block/BlockInfoTest.java   |  13 +-
 .../datastore/block/TableBlockInfoTest.java     |  33 +-
 .../datastore/block/TableTaskInfoTest.java      |   9 +-
 .../core/util/CarbonMetadataUtilTest.java       |   2 +-
 .../carbondata/core/util/CarbonUtilTest.java    |   9 +-
 .../core/util/DataFileFooterConverterTest.java  |   5 +-
 .../carbondata/examples/CarbonExample.scala     |   2 +-
 .../carbondata/hadoop/CarbonInputFormat.java    |   6 +-
 .../carbondata/hadoop/CarbonInputSplit.java     |  21 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   7 +-
 .../TestQueryWithOldCarbonDataFile.scala        |   8 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   7 +-
 .../store/CarbonDataWriterFactory.java          |  16 +-
 .../store/CarbonFactDataHandlerColumnar.java    |   4 +-
 .../store/writer/CarbonFactDataWriterImpl2.java | 285 --------------
 ...actDataWriterImplForIntIndexAndAggBlock.java | 379 ------------------
 .../writer/v1/CarbonFactDataWriterImplV1.java   | 382 +++++++++++++++++++
 .../writer/v2/CarbonFactDataWriterImplV2.java   | 288 ++++++++++++++
 .../carbon/datastore/BlockIndexStoreTest.java   |  33 +-
 32 files changed, 891 insertions(+), 798 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/conf/dataload.properties.template
----------------------------------------------------------------------
diff --git a/conf/dataload.properties.template 
b/conf/dataload.properties.template
index 59cad4a..d5e9d6a 100644
--- a/conf/dataload.properties.template
+++ b/conf/dataload.properties.template
@@ -18,14 +18,14 @@
 
 #carbon store path
 # you should change to the code path of your local machine
-carbon.storelocation=/Users/wangfei/code/incubator-carbondata/examples/spark2/target/store
+carbon.storelocation=/Users/jackylk/code/incubator-carbondata/examples/spark2/target/store
 
 #true: use kettle to load data
 #false: use new flow to load data
 use_kettle=true
 
 # you should change to the code path of your local machine
-carbon.kettle.home=/Users/wangfei/code/incubator-carbondata/processing/carbonplugins
+carbon.kettle.home=/Users/jackylk/code/incubator-carbondata/processing/carbonplugins
 
 #csv delimiter character
 delimiter=,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/main/java/org/apache/carbondata/core/carbon/ColumnarFormatVersion.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/carbon/ColumnarFormatVersion.java
 
b/core/src/main/java/org/apache/carbondata/core/carbon/ColumnarFormatVersion.java
new file mode 100644
index 0000000..bef345c
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/carbon/ColumnarFormatVersion.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.carbon;
+
+public enum ColumnarFormatVersion {
+  V1((short)1),
+  V2((short)2);
+
+  private short version;
+  ColumnarFormatVersion(short version) {
+    this.version = version;
+  }
+
+  @Override
+  public String toString() {
+    return "ColumnarFormatV" + version;
+  }
+
+  public short number() {
+    return version;
+  }
+
+  public static ColumnarFormatVersion valueOf(short version) {
+    switch (version) {
+      case 1:
+        return V1;
+      case 2:
+        return V2;
+      default:
+        throw new IllegalArgumentException("invalid format version: " + 
version);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
index 0d60567..802a116 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.core.carbon.datastore.block;
 
 import java.io.Serializable;
 
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.apache.carbondata.core.carbon.path.CarbonTablePath;
 import org.apache.carbondata.core.carbon.path.CarbonTablePath.DataFileUtil;
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
@@ -57,14 +58,14 @@ public class TableBlockInfo implements Distributable, 
Serializable {
 
   private String[] locations;
 
-  private short version;
+  private ColumnarFormatVersion version;
   /**
    * The class holds the blockletsinfo
    */
   private BlockletInfos blockletInfos = new BlockletInfos();
 
   public TableBlockInfo(String filePath, long blockOffset, String segmentId, 
String[] locations,
-      long blockLength, short version) {
+      long blockLength, ColumnarFormatVersion version) {
     this.filePath = FileFactory.getUpdatedFilePath(filePath);
     this.blockOffset = blockOffset;
     this.segmentId = segmentId;
@@ -84,7 +85,7 @@ public class TableBlockInfo implements Distributable, 
Serializable {
    * @param blockletInfos
    */
   public TableBlockInfo(String filePath, long blockOffset, String segmentId, 
String[] locations,
-      long blockLength, BlockletInfos blockletInfos, short version) {
+      long blockLength, BlockletInfos blockletInfos, ColumnarFormatVersion 
version) {
     this.filePath = FileFactory.getUpdatedFilePath(filePath);
     this.blockOffset = blockOffset;
     this.segmentId = segmentId;
@@ -259,11 +260,11 @@ public class TableBlockInfo implements Distributable, 
Serializable {
     this.blockletInfos = blockletInfos;
   }
 
-  public short getVersion() {
+  public ColumnarFormatVersion getVersion() {
     return version;
   }
 
-  public void setVersion(short version) {
+  public void setVersion(ColumnarFormatVersion version) {
     this.version = version;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/CarbonDataReaderFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/CarbonDataReaderFactory.java
 
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/CarbonDataReaderFactory.java
index 08a1869..9bf7e62 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/CarbonDataReaderFactory.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/CarbonDataReaderFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.carbondata.core.carbon.datastore.chunk.reader;
 
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import 
org.apache.carbondata.core.carbon.datastore.chunk.reader.dimension.v1.CompressedDimensionChunkFileBasedReaderV1;
 import 
org.apache.carbondata.core.carbon.datastore.chunk.reader.dimension.v2.CompressedDimensionChunkFileBasedReaderV2;
 import 
org.apache.carbondata.core.carbon.datastore.chunk.reader.measure.v1.CompressedMeasureChunkFileBasedReaderV1;
@@ -60,15 +61,17 @@ public class CarbonDataReaderFactory {
    * @param filePath            carbon data file path
    * @return dimension column data reader based on version number
    */
-  public DimensionColumnChunkReader getDimensionColumnChunkReader(short 
version,
+  public DimensionColumnChunkReader 
getDimensionColumnChunkReader(ColumnarFormatVersion version,
       BlockletInfo blockletInfo, int[] eachColumnValueSize, String filePath) {
     switch (version) {
-      case 2:
+      case V2:
         return new CompressedDimensionChunkFileBasedReaderV2(blockletInfo, 
eachColumnValueSize,
             filePath);
-      default:
+      case V1:
         return new CompressedDimensionChunkFileBasedReaderV1(blockletInfo, 
eachColumnValueSize,
             filePath);
+      default:
+        throw new IllegalArgumentException("invalid format version: " + 
version);
     }
   }
 
@@ -80,13 +83,15 @@ public class CarbonDataReaderFactory {
    * @param filePath     carbon data file path
    * @return measure column data reader based on version number
    */
-  public MeasureColumnChunkReader getMeasureColumnChunkReader(short version,
+  public MeasureColumnChunkReader 
getMeasureColumnChunkReader(ColumnarFormatVersion version,
       BlockletInfo blockletInfo, String filePath) {
     switch (version) {
-      case 2:
+      case V2:
         return new CompressedMeasureChunkFileBasedReaderV2(blockletInfo, 
filePath);
-      default:
+      case V1:
         return new CompressedMeasureChunkFileBasedReaderV1(blockletInfo, 
filePath);
+      default:
+        throw new IllegalArgumentException("invalid format version: " + 
version);
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/main/java/org/apache/carbondata/core/carbon/metadata/blocklet/DataFileFooter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/carbon/metadata/blocklet/DataFileFooter.java
 
b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/blocklet/DataFileFooter.java
index be235ba..a82bac9 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/carbon/metadata/blocklet/DataFileFooter.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/blocklet/DataFileFooter.java
@@ -21,6 +21,7 @@ package org.apache.carbondata.core.carbon.metadata.blocklet;
 import java.io.Serializable;
 import java.util.List;
 
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.apache.carbondata.core.carbon.datastore.block.BlockInfo;
 import org.apache.carbondata.core.carbon.metadata.blocklet.index.BlockletIndex;
 import 
org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
@@ -38,7 +39,7 @@ public class DataFileFooter implements Serializable {
   /**
    * version used for data compatibility
    */
-  private short versionId;
+  private ColumnarFormatVersion versionId;
 
   /**
    * total number of rows in this file
@@ -73,14 +74,14 @@ public class DataFileFooter implements Serializable {
   /**
    * @return the versionId
    */
-  public short getVersionId() {
+  public ColumnarFormatVersion getVersionId() {
     return versionId;
   }
 
   /**
    * @param versionId the versionId to set
    */
-  public void setVersionId(short versionId) {
+  public void setVersionId(ColumnarFormatVersion versionId) {
     this.versionId = versionId;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 443c8c4..1ac2ba1 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -900,7 +900,7 @@ public final class CarbonCommonConstants {
   /**
    * current data file version
    */
-  public static final short CARBON_DATA_FILE_DEFAULT_VERSION = 2;
+  public static final String CARBON_DATA_FILE_DEFAULT_VERSION = "V2";
   /**
    * number of column data will read in IO operation
    * during query execution

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
 
b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
index db9c9be..7f50c34 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.List;
 
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.apache.carbondata.core.carbon.datastore.block.BlockInfo;
 import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
@@ -99,7 +100,8 @@ public abstract class AbstractDataFileFooterConverter {
         dataFileFooter = new DataFileFooter();
         TableBlockInfo tableBlockInfo = tableBlockInfoList.get(counter++);
         tableBlockInfo.setBlockOffset(readBlockIndexInfo.getOffset());
-        tableBlockInfo.setVersion((short) readIndexHeader.getVersion());
+        tableBlockInfo.setVersion(
+            ColumnarFormatVersion.valueOf((short) 
readIndexHeader.getVersion()));
         int blockletSize = getBlockletSize(readBlockIndexInfo);
         tableBlockInfo.getBlockletInfos().setNoOfBlockLets(blockletSize);
         dataFileFooter.setBlockletIndex(blockletIndex);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
index 4f8a435..bdd6fae 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
@@ -30,6 +30,7 @@ import java.util.Set;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.carbon.metadata.index.BlockIndexInfo;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -97,10 +98,9 @@ public class CarbonMetadataUtil {
     SegmentInfo segmentInfo = new SegmentInfo();
     segmentInfo.setNum_cols(columnSchemaList.size());
     
segmentInfo.setColumn_cardinalities(CarbonUtil.convertToIntegerList(cardinalities));
-    short version = Short.parseShort(
-        
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION));
+    ColumnarFormatVersion version = 
CarbonProperties.getInstance().getFormatVersion();
     FileFooter footer = new FileFooter();
-    footer.setVersion(version);
+    footer.setVersion(version.number());
     footer.setNum_rows(getTotalNumberOfRows(infoList));
     footer.setSegment_info(segmentInfo);
     footer.setTable_columns(columnSchemaList);
@@ -476,9 +476,8 @@ public class CarbonMetadataUtil {
     
segmentInfo.setColumn_cardinalities(CarbonUtil.convertToIntegerList(columnCardinality));
     // create index header object
     IndexHeader indexHeader = new IndexHeader();
-    short version = Short.parseShort(
-        
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION));
-    indexHeader.setVersion(version);
+    ColumnarFormatVersion version = 
CarbonProperties.getInstance().getFormatVersion();
+    indexHeader.setVersion(version.number());
     // set the segment info
     indexHeader.setSegment_info(segmentInfo);
     // set the column names

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index adb0e6a..f4ec63d 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -27,6 +27,7 @@ import java.util.Properties;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 
 public final class CarbonProperties {
@@ -263,27 +264,25 @@ public final class CarbonProperties {
    * if parameter is invalid current version will be set
    */
   private void validateCarbonDataFileVersion() {
-    short carbondataFileVersion = 
CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION;
     String carbondataFileVersionString =
         
carbonProperties.getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION);
-    try {
-      carbondataFileVersion = Short.parseShort(carbondataFileVersionString);
-    } catch (NumberFormatException e) {
-      carbondataFileVersion = 
CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION;
-      LOGGER.info("Current Data file version property is invalid  \"" + 
carbondataFileVersionString
-          + "\" is invalid. Using the Current data file version value \"" + 
carbondataFileVersion);
-      carbonProperties
-          .setProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, 
carbondataFileVersion + "");
-    }
-    if (carbondataFileVersion > 
CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION
-        || carbondataFileVersion < 0) {
-      LOGGER.info("Current Data file version property is invalid  \"" + 
carbondataFileVersionString
-          + "\" is invalid. Using the Current data file version value \"" + 
carbondataFileVersion);
-      carbondataFileVersion = 
CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION;
+    if (carbondataFileVersionString == null) {
+      // use default property if user does not specify version property
       carbonProperties
-          .setProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, 
carbondataFileVersion + "");
+          .setProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
+              CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION);
+    } else {
+      try {
+        ColumnarFormatVersion.valueOf(carbondataFileVersionString);
+      } catch (IllegalArgumentException e) {
+        // use default property if user specifies an invalid version property
+        LOGGER.warn("Specified file version property is invalid: " +
+            carbondataFileVersionString + ". Using " +
+            CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION + " as 
default file version");
+        
carbonProperties.setProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
+            CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION);
+      }
     }
-
   }
 
   /**
@@ -362,7 +361,23 @@ public final class CarbonProperties {
    */
   public void addProperty(String key, String value) {
     carbonProperties.setProperty(key, value);
+  }
+
+  private ColumnarFormatVersion getDefaultFormatVersion() {
+    return 
ColumnarFormatVersion.valueOf(CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION);
+  }
 
+  public ColumnarFormatVersion getFormatVersion() {
+    String versionStr = 
getInstance().getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION);
+    if (versionStr == null) {
+      return getDefaultFormatVersion();
+    } else {
+      try {
+        return ColumnarFormatVersion.valueOf(versionStr);
+      } catch (IllegalArgumentException e) {
+        return getDefaultFormatVersion();
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 162e9b9..41594ad 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -1056,11 +1056,6 @@ public final class CarbonUtil {
 
   /**
    * Below method will be used to read the data file matadata
-   *
-   * @param filePath file path
-   * @param blockOffset   offset in the file
-   * @return Data file metadata instance
-   * @throws CarbonUtilException
    */
   public static DataFileFooter readMetadatFile(TableBlockInfo tableBlockInfo)
       throws CarbonUtilException {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
 
b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
index ea1324e..e766e85 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.carbon.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
@@ -55,7 +56,7 @@ public class DataFileFooterConverter extends 
AbstractDataFileFooterConverter {
       CarbonFooterReader reader =
           new CarbonFooterReader(tableBlockInfo.getFilePath(), 
actualFooterOffset);
       FileFooter footer = reader.readFooter();
-      dataFileFooter.setVersionId((short) footer.getVersion());
+      dataFileFooter.setVersionId(ColumnarFormatVersion.valueOf((short) 
footer.getVersion()));
       dataFileFooter.setNumberOfRows(footer.getNum_rows());
       dataFileFooter.setSegmentInfo(getSegmentInfo(footer.getSegment_info()));
       List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
 
b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
index d971756..02de383 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.carbon.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
@@ -46,7 +47,7 @@ public class DataFileFooterConverter2 extends 
AbstractDataFileFooterConverter {
     CarbonFooterReader reader =
         new CarbonFooterReader(tableBlockInfo.getFilePath(), 
tableBlockInfo.getBlockOffset());
     FileFooter footer = reader.readFooter();
-    dataFileFooter.setVersionId((short) footer.getVersion());
+    dataFileFooter.setVersionId(ColumnarFormatVersion.valueOf((short) 
footer.getVersion()));
     dataFileFooter.setNumberOfRows(footer.getNum_rows());
     dataFileFooter.setSegmentInfo(getSegmentInfo(footer.getSegment_info()));
     List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java
 
b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java
index a079ad7..175a22b 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.carbondata.core.util;
 
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 
 /**
  * Factory class to get the thrift reader object based on version
@@ -49,15 +50,18 @@ public class DataFileFooterConverterFactory {
   /**
    * Method will be used to get the file footer converter instance based on 
version
    *
-   * @param versionNumber
+   * @param version
    * @return footer reader instance
    */
-  public AbstractDataFileFooterConverter getDataFileFooterConverter(final 
short versionNumber) {
-    switch (versionNumber) {
-      case 2:
+  public AbstractDataFileFooterConverter getDataFileFooterConverter(
+      final ColumnarFormatVersion version) {
+    switch (version) {
+      case V2:
         return new DataFileFooterConverter2();
-      default:
+      case V1:
         return new DataFileFooterConverter();
+      default:
+        throw new IllegalArgumentException("invalid format version: " + 
version);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfoTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfoTest.java
 
b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfoTest.java
index eabf688..6d90a36 100644
--- 
a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfoTest.java
+++ 
b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfoTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.carbondata.core.carbon.datastore.block;
 
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -28,7 +29,7 @@ public class BlockInfoTest {
   static BlockInfo blockInfo;
 
   @BeforeClass public static void setup() {
-    blockInfo = new BlockInfo(new TableBlockInfo("filePath", 6, "segmentId", 
null, 6,(short)1));
+    blockInfo = new BlockInfo(new TableBlockInfo("filePath", 6, "segmentId", 
null, 6, ColumnarFormatVersion.V1));
   }
 
   @Test public void hashCodeTest() {
@@ -44,7 +45,7 @@ public class BlockInfoTest {
 
   @Test public void equalsTestWithSimilarObject() {
     BlockInfo blockInfoTest =
-        new BlockInfo(new TableBlockInfo("filePath", 6, "segmentId", null, 6, 
(short)1));
+        new BlockInfo(new TableBlockInfo("filePath", 6, "segmentId", null, 6, 
ColumnarFormatVersion.V1));
     Boolean res = blockInfo.equals(blockInfoTest);
     assert (res);
   }
@@ -61,28 +62,28 @@ public class BlockInfoTest {
 
   @Test public void equalsTestWithDifferentSegmentId() {
     BlockInfo blockInfoTest =
-        new BlockInfo(new TableBlockInfo("filePath", 6, "diffSegmentId", null, 
6,(short)1));
+        new BlockInfo(new TableBlockInfo("filePath", 6, "diffSegmentId", null, 
6, ColumnarFormatVersion.V1));
     Boolean res = blockInfo.equals(blockInfoTest);
     assert (!res);
   }
 
   @Test public void equalsTestWithDifferentOffset() {
     BlockInfo blockInfoTest =
-        new BlockInfo(new TableBlockInfo("filePath", 62, "segmentId", null, 6, 
(short)1));
+        new BlockInfo(new TableBlockInfo("filePath", 62, "segmentId", null, 6, 
ColumnarFormatVersion.V1));
     Boolean res = blockInfo.equals(blockInfoTest);
     assert (!res);
   }
 
   @Test public void equalsTestWithDifferentBlockLength() {
     BlockInfo blockInfoTest =
-        new BlockInfo(new TableBlockInfo("filePath", 6, "segmentId", null, 62, 
(short)1));
+        new BlockInfo(new TableBlockInfo("filePath", 6, "segmentId", null, 62, 
ColumnarFormatVersion.V1));
     Boolean res = blockInfo.equals(blockInfoTest);
     assert (!res);
   }
 
   @Test public void equalsTestWithDiffFilePath() {
     BlockInfo blockInfoTest =
-        new BlockInfo(new TableBlockInfo("diffFilePath", 6, "segmentId", null, 
62, (short)1));
+        new BlockInfo(new TableBlockInfo("diffFilePath", 6, "segmentId", null, 
62, ColumnarFormatVersion.V1));
     Boolean res = blockInfoTest.equals(blockInfo);
     assert (!res);
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfoTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfoTest.java
 
b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfoTest.java
index 1b49f83..b6669ed 100644
--- 
a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfoTest.java
+++ 
b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfoTest.java
@@ -21,6 +21,7 @@ package org.apache.carbondata.core.carbon.datastore.block;
 import mockit.Mock;
 import mockit.MockUp;
 
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.apache.carbondata.core.carbon.path.CarbonTablePath;
 
 import org.junit.BeforeClass;
@@ -34,8 +35,8 @@ public class TableBlockInfoTest {
   static TableBlockInfo tableBlockInfos;
 
   @BeforeClass public static void setup() {
-    tableBlockInfo = new TableBlockInfo("filePath", 4, "segmentId", null, 6, 
(short) 1);
-    tableBlockInfos = new TableBlockInfo("filepath", 6, "5", null, 6, new 
BlockletInfos(6, 2, 2), (short) 1);
+    tableBlockInfo = new TableBlockInfo("filePath", 4, "segmentId", null, 6, 
ColumnarFormatVersion.V1);
+    tableBlockInfos = new TableBlockInfo("filepath", 6, "5", null, 6, new 
BlockletInfos(6, 2, 2), ColumnarFormatVersion.V1);
   }
 
   @Test public void equalTestWithSameObject() {
@@ -44,7 +45,7 @@ public class TableBlockInfoTest {
   }
 
   @Test public void equalTestWithSimilarObject() {
-    TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, 
"segmentId", null, 6, (short) 1);
+    TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, 
"segmentId", null, 6, ColumnarFormatVersion.V1);
     Boolean res = tableBlockInfo.equals(tableBlockInfoTest);
     assert (res);
   }
@@ -60,52 +61,52 @@ public class TableBlockInfoTest {
   }
 
   @Test public void equlsTestWithDiffSegmentId() {
-    TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, 
"diffsegmentId", null, 6, (short) 1);
+    TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, 
"diffsegmentId", null, 6, ColumnarFormatVersion.V1);
     Boolean res = tableBlockInfo.equals(tableBlockInfoTest);
     assert (!res);
   }
 
   @Test public void equlsTestWithDiffBlockOffset() {
-    TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 6, 
"segmentId", null, 6, (short) 1);
+    TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 6, 
"segmentId", null, 6, ColumnarFormatVersion.V1);
     Boolean res = tableBlockInfo.equals(tableBlockInfoTest);
     assert (!res);
   }
 
   @Test public void equalsTestWithDiffBlockLength() {
-    TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, 
"segmentId", null, 4, (short) 1);
+    TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, 
"segmentId", null, 4, ColumnarFormatVersion.V1);
     Boolean res = tableBlockInfo.equals(tableBlockInfoTest);
     assert (!res);
   }
 
   @Test public void equalsTestWithDiffBlockletNumber() {
     TableBlockInfo tableBlockInfoTest =
-        new TableBlockInfo("filepath", 6, "segmentId", null, 6, new 
BlockletInfos(6, 3, 2), (short) 1);
+        new TableBlockInfo("filepath", 6, "segmentId", null, 6, new 
BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1);
     Boolean res = tableBlockInfos.equals(tableBlockInfoTest);
     assert (!res);
   }
 
   @Test public void equalsTestWithDiffFilePath() {
     TableBlockInfo tableBlockInfoTest =
-        new TableBlockInfo("difffilepath", 6, "segmentId", null, 6, new 
BlockletInfos(6, 3, 2), (short) 1);
+        new TableBlockInfo("difffilepath", 6, "segmentId", null, 6, new 
BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1);
     Boolean res = tableBlockInfos.equals(tableBlockInfoTest);
     assert (!res);
   }
 
   @Test public void compareToTestForSegmentId() {
     TableBlockInfo tableBlockInfo =
-        new TableBlockInfo("difffilepath", 6, "5", null, 6, new 
BlockletInfos(6, 3, 2), (short) 1);
+        new TableBlockInfo("difffilepath", 6, "5", null, 6, new 
BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1);
     int res = tableBlockInfos.compareTo(tableBlockInfo);
     int expectedResult = 2;
     assertEquals(res, expectedResult);
 
     TableBlockInfo tableBlockInfo1 =
-        new TableBlockInfo("difffilepath", 6, "6", null, 6, new 
BlockletInfos(6, 3, 2), (short) 1);
+        new TableBlockInfo("difffilepath", 6, "6", null, 6, new 
BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1);
     int res1 = tableBlockInfos.compareTo(tableBlockInfo1);
     int expectedResult1 = -1;
     assertEquals(res1, expectedResult1);
 
     TableBlockInfo tableBlockInfo2 =
-        new TableBlockInfo("difffilepath", 6, "4", null, 6, new 
BlockletInfos(6, 3, 2), (short) 1);
+        new TableBlockInfo("difffilepath", 6, "4", null, 6, new 
BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1);
     int res2 = tableBlockInfos.compareTo(tableBlockInfo2);
     int expectedresult2 = 1;
     assertEquals(res2, expectedresult2);
@@ -130,18 +131,18 @@ public class TableBlockInfoTest {
 
     };
 
-    TableBlockInfo tableBlockInfo = new TableBlockInfo("difffilepaths", 6, 
"5", null, 3, (short) 1);
+    TableBlockInfo tableBlockInfo = new TableBlockInfo("difffilepaths", 6, 
"5", null, 3, ColumnarFormatVersion.V1);
     int res = tableBlockInfos.compareTo(tableBlockInfo);
     int expectedResult = -5;
     assertEquals(res, expectedResult);
 
-    TableBlockInfo tableBlockInfo1 = new TableBlockInfo("filepath", 6, "5", 
null, 3, (short) 1);
+    TableBlockInfo tableBlockInfo1 = new TableBlockInfo("filepath", 6, "5", 
null, 3, ColumnarFormatVersion.V1);
     int res1 = tableBlockInfos.compareTo(tableBlockInfo1);
     int expectedResult1 = 1;
     assertEquals(res1, expectedResult1);
 
     TableBlockInfo tableBlockInfoTest =
-        new TableBlockInfo("filePath", 6, "5", null, 7, new BlockletInfos(6, 
2, 2), (short) 1);
+        new TableBlockInfo("filePath", 6, "5", null, 7, new BlockletInfos(6, 
2, 2), ColumnarFormatVersion.V1);
     int res2 = tableBlockInfos.compareTo(tableBlockInfoTest);
     int expectedResult2 = -1;
     assertEquals(res2, expectedResult2);
@@ -149,13 +150,13 @@ public class TableBlockInfoTest {
 
   @Test public void compareToTestWithStartBlockletNo() {
     TableBlockInfo tableBlockInfo =
-        new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 
3, 2), (short) 1);
+        new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 
3, 2), ColumnarFormatVersion.V1);
     int res = tableBlockInfos.compareTo(tableBlockInfo);
     int expectedresult =-1;
     assertEquals(res, expectedresult);
 
     TableBlockInfo tableBlockInfo1 =
-        new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 
1, 2), (short) 1);
+        new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 
1, 2), ColumnarFormatVersion.V1);
     int res1 = tableBlockInfos.compareTo(tableBlockInfo1);
     int expectedresult1 = 1;
     assertEquals(res1, expectedresult1);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableTaskInfoTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableTaskInfoTest.java
 
b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableTaskInfoTest.java
index 83b62a5..e9d09b8 100644
--- 
a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableTaskInfoTest.java
+++ 
b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableTaskInfoTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.carbondata.core.carbon.datastore.block;
 
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -34,10 +35,10 @@ public class TableTaskInfoTest {
     tableBlockInfoList = new ArrayList<>(5);
 
     String[] locations = { "loc1", "loc2", "loc3" };
-    tableBlockInfoList.add(0, new TableBlockInfo("filePath", 2, "segmentID", 
locations, 6, (short) 1));
+    tableBlockInfoList.add(0, new TableBlockInfo("filePath", 2, "segmentID", 
locations, 6, ColumnarFormatVersion.V1));
 
     String[] locs = { "loc4", "loc5" };
-    tableBlockInfoList.add(1, new TableBlockInfo("filepath", 2, "segmentId", 
locs, 6, (short) 1));
+    tableBlockInfoList.add(1, new TableBlockInfo("filepath", 2, "segmentId", 
locs, 6, ColumnarFormatVersion.V1));
 
     tableTaskInfo = new TableTaskInfo("taskId", tableBlockInfoList);
   }
@@ -68,10 +69,10 @@ public class TableTaskInfoTest {
     List<TableBlockInfo> tableBlockInfoListTest = new ArrayList<>();
 
     String[] locations = { "loc1", "loc2", "loc3" };
-    tableBlockInfoListTest.add(0, new TableBlockInfo("filePath", 2, 
"segmentID", locations, 6, (short) 1));
+    tableBlockInfoListTest.add(0, new TableBlockInfo("filePath", 2, 
"segmentID", locations, 6, ColumnarFormatVersion.V1));
 
     String[] locations1 = { "loc1", "loc2", "loc3" };
-    tableBlockInfoListTest.add(1, new TableBlockInfo("filePath", 2, 
"segmentID", locations1, 6, (short) 1));
+    tableBlockInfoListTest.add(1, new TableBlockInfo("filePath", 2, 
"segmentID", locations1, 6, ColumnarFormatVersion.V1));
 
     List<String> res = TableTaskInfo.maxNoNodes(tableBlockInfoListTest);
     assert (res.equals(locs));

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
 
b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
index d959a5c..be270e4 100644
--- 
a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
+++ 
b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
@@ -166,7 +166,7 @@ public class CarbonMetadataUtilTest {
     segmentInfo.setNum_cols(0);
     
segmentInfo.setColumn_cardinalities(CarbonUtil.convertToIntegerList(columnCardinality));
     IndexHeader indexHeader = new IndexHeader();
-    
indexHeader.setVersion(CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION);
+    indexHeader.setVersion(2);
     indexHeader.setSegment_info(segmentInfo);
     indexHeader.setTable_columns(columnSchemaList);
     IndexHeader indexheaderResult = getIndexHeader(columnCardinality, 
columnSchemaList);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java 
b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
index c0d890c..869c8cc 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
@@ -21,6 +21,7 @@ package org.apache.carbondata.core.util;
 import mockit.Mock;
 import mockit.MockUp;
 
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import 
org.apache.carbondata.core.carbon.datastore.chunk.DimensionChunkAttributes;
 import 
org.apache.carbondata.core.carbon.datastore.chunk.impl.FixedLengthDimensionDataChunk;
@@ -555,18 +556,18 @@ public class CarbonUtilTest {
       @SuppressWarnings("unused") @Mock
       public DataFileFooter readDataFileFooter(TableBlockInfo info) {
         DataFileFooter fileFooter = new DataFileFooter();
-        fileFooter.setVersionId((short)1);
+        fileFooter.setVersionId(ColumnarFormatVersion.V1);
         return fileFooter;
       }
     };
-    TableBlockInfo info = new TableBlockInfo("file:/", 1, "0", new String[0], 
1, (short)1);
+    TableBlockInfo info = new TableBlockInfo("file:/", 1, "0", new String[0], 
1, ColumnarFormatVersion.V1);
     
-    assertEquals(CarbonUtil.readMetadatFile(info).getVersionId(), 1);
+    assertEquals(CarbonUtil.readMetadatFile(info).getVersionId().number(), 1);
   }
 
   @Test(expected = CarbonUtilException.class) public void 
testToReadMetadatFileWithException()
       throws Exception {
-       TableBlockInfo info = new TableBlockInfo("file:/", 1, "0", new 
String[0], 1, (short)1);
+       TableBlockInfo info = new TableBlockInfo("file:/", 1, "0", new 
String[0], 1, ColumnarFormatVersion.V1);
     CarbonUtil.readMetadatFile(info);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
 
b/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
index 62d1ac7..b7c48d7 100644
--- 
a/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
+++ 
b/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
@@ -22,6 +22,7 @@ package org.apache.carbondata.core.util;
 import mockit.Mock;
 import mockit.MockUp;
 
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.apache.carbondata.core.carbon.datastore.block.BlockInfo;
 import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.carbon.metadata.blocklet.BlockletInfo;
@@ -152,7 +153,7 @@ public class DataFileFooterConverterTest {
       }
     };
     String[] arr = { "a", "b", "c" };
-    TableBlockInfo tableBlockInfo = new TableBlockInfo("file", 3, "id", arr, 
3, (short) 1);
+    TableBlockInfo tableBlockInfo = new TableBlockInfo("file", 3, "id", arr, 
3, ColumnarFormatVersion.V1);
     tableBlockInfo.getBlockletInfos().setNoOfBlockLets(3);
     List<TableBlockInfo> tableBlockInfoList = new ArrayList<>();
     tableBlockInfoList.add(tableBlockInfo);
@@ -254,7 +255,7 @@ public class DataFileFooterConverterTest {
     segmentInfo.setNumberOfColumns(segmentInfo1.getNum_cols());
     dataFileFooter.setNumberOfRows(3);
     dataFileFooter.setSegmentInfo(segmentInfo);
-    TableBlockInfo info = new TableBlockInfo("file", 1, "0", new String[0], 1, 
(short)1);
+    TableBlockInfo info = new TableBlockInfo("file", 1, "0", new String[0], 1, 
ColumnarFormatVersion.V1);
     DataFileFooter result = dataFileFooterConverter.readDataFileFooter(info);
     assertEquals(result.getNumberOfRows(), 3);
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
index 9102c78..75fdd1c 100644
--- 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
+++ 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
@@ -26,7 +26,7 @@ object CarbonExample {
 
   def main(args: Array[String]): Unit = {
     // to run the example, plz change this path to your local machine path
-    val rootPath = "/Users/wangfei/code/incubator-carbondata"
+    val rootPath = "/Users/jackylk/code/incubator-carbondata"
     val spark = SparkSession
         .builder()
         .master("local")

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index 8b453c7..e707c4e 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.apache.carbondata.core.carbon.datastore.DataRefNode;
 import org.apache.carbondata.core.carbon.datastore.DataRefNodeFinder;
 import org.apache.carbondata.core.carbon.datastore.IndexKey;
@@ -254,8 +255,9 @@ public class CarbonInputFormat<T> extends 
FileInputFormat<Void, T> {
       if (segmentId.equals(CarbonCommonConstants.INVALID_SEGMENT_ID)) {
         continue;
       }
-      carbonSplits.add(CarbonInputSplit
-          .from(segmentId, fileSplit, 
CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION));
+      carbonSplits.add(CarbonInputSplit.from(segmentId, fileSplit,
+              ColumnarFormatVersion.valueOf(
+                  CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION)));
     }
     return carbonSplits;
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index efc4f77..8b87cad 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -25,11 +25,12 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.apache.carbondata.core.carbon.datastore.block.BlockletInfos;
 import org.apache.carbondata.core.carbon.datastore.block.Distributable;
 import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.carbon.path.CarbonTablePath;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.hadoop.internal.index.Block;
 
 import org.apache.hadoop.fs.Path;
@@ -55,17 +56,18 @@ public class CarbonInputSplit extends FileSplit
    */
   private int numberOfBlocklets;
 
-  private short version = 
CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION;
+  private ColumnarFormatVersion version;
 
   public CarbonInputSplit() {
     segmentId = null;
     taskId = "0";
     numberOfBlocklets = 0;
     invalidSegments = new ArrayList<>();
+    version = CarbonProperties.getInstance().getFormatVersion();
   }
 
   private CarbonInputSplit(String segmentId, Path path, long start, long 
length, String[] locations,
-      short version) {
+      ColumnarFormatVersion version) {
     super(path, start, length, locations);
     this.segmentId = segmentId;
     this.taskId = CarbonTablePath.DataFileUtil.getTaskNo(path.getName());
@@ -74,12 +76,13 @@ public class CarbonInputSplit extends FileSplit
   }
 
   public CarbonInputSplit(String segmentId, Path path, long start, long 
length, String[] locations,
-      int numberOfBlocklets, short version) {
+      int numberOfBlocklets, ColumnarFormatVersion version) {
     this(segmentId, path, start, length, locations, version);
     this.numberOfBlocklets = numberOfBlocklets;
   }
 
-  public static CarbonInputSplit from(String segmentId, FileSplit split, short 
version)
+  public static CarbonInputSplit from(String segmentId, FileSplit split,
+      ColumnarFormatVersion version)
       throws IOException {
     return new CarbonInputSplit(segmentId, split.getPath(), split.getStart(), 
split.getLength(),
         split.getLocations(), version);
@@ -120,7 +123,7 @@ public class CarbonInputSplit extends FileSplit
   @Override public void readFields(DataInput in) throws IOException {
     super.readFields(in);
     this.segmentId = in.readUTF();
-    this.version = in.readShort();
+    this.version = ColumnarFormatVersion.valueOf(in.readShort());
     int numInvalidSegment = in.readInt();
     invalidSegments = new ArrayList<>(numInvalidSegment);
     for (int i = 0; i < numInvalidSegment; i++) {
@@ -131,7 +134,7 @@ public class CarbonInputSplit extends FileSplit
   @Override public void write(DataOutput out) throws IOException {
     super.write(out);
     out.writeUTF(segmentId);
-    out.writeShort(version);
+    out.writeShort(version.number());
     out.writeInt(invalidSegments.size());
     for (String invalidSegment : invalidSegments) {
       out.writeUTF(invalidSegment);
@@ -155,11 +158,11 @@ public class CarbonInputSplit extends FileSplit
     return numberOfBlocklets;
   }
 
-  public short getVersion() {
+  public ColumnarFormatVersion getVersion() {
     return version;
   }
 
-  public void setVersion(short version) {
+  public void setVersion(ColumnarFormatVersion version) {
     this.version = version;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 1801408..e5eb78a 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.util.SparkUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, 
CarbonTableIdentifier}
+import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, 
CarbonTableIdentifier, ColumnarFormatVersion}
 import org.apache.carbondata.core.carbon.datastore.block.{Distributable, 
TableBlockInfo}
 import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -801,14 +801,13 @@ object CarbonDataRDDFactory {
           val jobContext = new Job(hadoopConfiguration)
           val rawSplits = inputFormat.getSplits(jobContext).toArray
           val result = new Array[Partition](rawSplits.size)
-          val blockList = rawSplits.map(inputSplit => {
+          val blockList = rawSplits.map { inputSplit =>
             val fileSplit = inputSplit.asInstanceOf[FileSplit]
             new TableBlockInfo(fileSplit.getPath.toString,
               fileSplit.getStart, "1",
-              fileSplit.getLocations, fileSplit.getLength, 0
+              fileSplit.getLocations, fileSplit.getLength, 
ColumnarFormatVersion.V1
             ).asInstanceOf[Distributable]
           }
-          )
           // group blocks to nodes, tasks
           val startTime = System.currentTimeMillis
           val activeNodes = DistributionUtil

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithOldCarbonDataFile.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithOldCarbonDataFile.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithOldCarbonDataFile.scala
index 431180c..749a6e8 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithOldCarbonDataFile.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithOldCarbonDataFile.scala
@@ -32,7 +32,7 @@ import 
org.apache.carbondata.core.constants.CarbonCommonConstants
  */
 class TestQueryWithOldCarbonDataFile extends QueryTest with BeforeAndAfterAll {
   override def beforeAll {
-         
CarbonProperties.getInstance.addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
 "1");         
+         
CarbonProperties.getInstance.addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
 "V1");
     sql("drop table if exists OldFormatTable")
     sql("drop table if exists OldFormatTableHIVE")
      sql("""
@@ -47,14 +47,14 @@ class TestQueryWithOldCarbonDataFile extends QueryTest with 
BeforeAndAfterAll {
            name String, phonetype String, serialname String, salary Int)
           row format delimited fields terminated by ','
            """)      
-    sql("LOAD DATA local inpath './src/test/resources/OLDFORMATTABLE.csv' INTO 
table OldFormatTable");       
+    sql("LOAD DATA local inpath './src/test/resources/OLDFORMATTABLE.csv' INTO 
table OldFormatTable")
    sql(s"""
            LOAD DATA LOCAL INPATH 
'./src/test/resources/OLDFORMATTABLEHIVE.csv' into table OldFormatTableHIVE
            """)
 
   }
 
-  
CarbonProperties.getInstance.addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
 "2");
+  
CarbonProperties.getInstance.addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
 "V2")
   test("Test select * query") {
     checkAnswer(
       sql("select * from OldFormatTable"), sql("select * from 
OldFormatTableHIVE")
@@ -62,7 +62,7 @@ class TestQueryWithOldCarbonDataFile extends QueryTest with 
BeforeAndAfterAll {
   }
 
   override def afterAll {
-     
CarbonProperties.getInstance.addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
 "1");
+     
CarbonProperties.getInstance.addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
 "V1")
     sql("drop table if exists OldFormatTable")
     sql("drop table if exists OldFormatTableHIVE")
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index cfae186..21c0fa7 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.util.SparkUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, 
CarbonTableIdentifier}
+import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, 
CarbonTableIdentifier, ColumnarFormatVersion}
 import org.apache.carbondata.core.carbon.datastore.block.{Distributable, 
TableBlockInfo}
 import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
 import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
@@ -810,14 +810,13 @@ object CarbonDataRDDFactory {
           }
           val jobContext = new Job(hadoopConfiguration)
           val rawSplits = inputFormat.getSplits(jobContext).toArray
-          val blockList = rawSplits.map(inputSplit => {
+          val blockList = rawSplits.map { inputSplit =>
             val fileSplit = inputSplit.asInstanceOf[FileSplit]
             new TableBlockInfo(fileSplit.getPath.toString,
               fileSplit.getStart, "1",
-              fileSplit.getLocations, fileSplit.getLength
+              fileSplit.getLocations, fileSplit.getLength, 
ColumnarFormatVersion.V1
             ).asInstanceOf[Distributable]
           }
-          )
           // group blocks to nodes, tasks
           val startTime = System.currentTimeMillis
           val activeNodes = DistributionUtil

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
index 2fbb00e..047ac0d 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
@@ -16,12 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.carbondata.processing.store;
 
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
 import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter;
-import org.apache.carbondata.processing.store.writer.CarbonFactDataWriterImpl2;
-import 
org.apache.carbondata.processing.store.writer.CarbonFactDataWriterImplForIntIndexAndAggBlock;
+import 
org.apache.carbondata.processing.store.writer.v1.CarbonFactDataWriterImplV1;
+import 
org.apache.carbondata.processing.store.writer.v2.CarbonFactDataWriterImplV2;
 
 /**
  * Factory class to get the writer instance
@@ -57,13 +59,15 @@ public class CarbonDataWriterFactory {
    * @param carbonDataWriterVo writer vo object
    * @return writer instance
    */
-  public CarbonFactDataWriter<?> getFactDataWriter(final short version,
+  public CarbonFactDataWriter<?> getFactDataWriter(final ColumnarFormatVersion 
version,
       final CarbonDataWriterVo carbonDataWriterVo) {
     switch (version) {
-      case 2:
-        return new CarbonFactDataWriterImpl2(carbonDataWriterVo);
+      case V2:
+        return new CarbonFactDataWriterImplV2(carbonDataWriterVo);
+      case V1:
+        return new CarbonFactDataWriterImplV1(carbonDataWriterVo);
       default:
-        return new 
CarbonFactDataWriterImplForIntIndexAndAggBlock(carbonDataWriterVo);
+        throw new IllegalArgumentException("invalid format version: " + 
version);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index e560784..c961700 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -41,6 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.carbon.metadata.CarbonMetadata;
 import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
@@ -1400,8 +1401,7 @@ public class CarbonFactDataHandlerColumnar implements 
CarbonFactHandler {
    * @return data writer instance
    */
   private CarbonFactDataWriter<?> getFactDataWriter(int[] keyBlockSize) {
-    short version = Short.parseShort(
-        
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION));
+    ColumnarFormatVersion version = 
CarbonProperties.getInstance().getFormatVersion();
     return CarbonDataWriterFactory.getInstance()
         .getFactDataWriter(version, getDataWriterVo(keyBlockSize));
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImpl2.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImpl2.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImpl2.java
deleted file mode 100644
index d399280..0000000
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImpl2.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.processing.store.writer;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
-import org.apache.carbondata.core.util.CarbonMetadataUtil;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.writer.CarbonFooterWriter;
-import org.apache.carbondata.format.DataChunk2;
-import org.apache.carbondata.format.FileFooter;
-import 
org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
-
-/**
- * Below method will be used to write the data in version 2 format
- */
-public class CarbonFactDataWriterImpl2 extends 
CarbonFactDataWriterImplForIntIndexAndAggBlock {
-
-  /**
-   * logger
-   */
-  private static final LogService LOGGER =
-      
LogServiceFactory.getLogService(CarbonFactDataWriterImpl2.class.getName());
-
-  /**
-   * Constructor create instance of this class
-   *
-   * @param dataWriterVo
-   */
-  public CarbonFactDataWriterImpl2(CarbonDataWriterVo dataWriterVo) {
-    super(dataWriterVo);
-  }
-
-  /**
-   * Below method will be used to write the data to carbon data file
-   *
-   * @param holder
-   * @throws CarbonDataWriterException any problem in writing operation
-   */
-  @Override public void writeBlockletData(NodeHolder holder) throws 
CarbonDataWriterException {
-    // size to calculate the size of the blocklet
-    int size = 0;
-    // get the blocklet info object
-    BlockletInfoColumnar blockletInfo = getBlockletInfo(holder, 0);
-
-    List<DataChunk2> datachunks = null;
-    try {
-      // get all the data chunks
-      datachunks = CarbonMetadataUtil
-          .getDatachunk2(blockletInfo, thriftColumnSchemaList, 
dataWriterVo.getSegmentProperties());
-    } catch (IOException e) {
-      throw new CarbonDataWriterException("Problem while getting the data 
chunks", e);
-    }
-    // data chunk byte array
-    byte[][] dataChunkByteArray = new byte[datachunks.size()][];
-    for (int i = 0; i < dataChunkByteArray.length; i++) {
-      dataChunkByteArray[i] = CarbonUtil.getByteArray(datachunks.get(i));
-      // add the data chunk size
-      size += dataChunkByteArray[i].length;
-    }
-    // add row id index length
-    for (int i = 0; i < holder.getKeyBlockIndexLength().length; i++) {
-      size += holder.getKeyBlockIndexLength()[i];
-    }
-    // add rle index length
-    for (int i = 0; i < holder.getDataIndexMapLength().length; i++) {
-      size += holder.getDataIndexMapLength()[i];
-    }
-    // add dimension column data page and measure column data page size
-    long blockletDataSize =
-        holder.getTotalDimensionArrayLength() + 
holder.getTotalMeasureArrayLength() + size;
-    // if size of the file already reached threshold size then create a new 
file and get the file
-    // channel object
-    updateBlockletFileChannel(blockletDataSize);
-    // writer the version header in the file if current file size is zero
-    // this is done so carbondata file can be read separately
-    try {
-      if (fileChannel.size() == 0) {
-        short version = Short.parseShort(CarbonProperties.getInstance()
-            .getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION));
-        byte[] header = (CarbonCommonConstants.CARBON_DATA_VERSION_HEADER + 
version).getBytes();
-        ByteBuffer buffer = ByteBuffer.allocate(header.length);
-        buffer.put(header);
-        buffer.rewind();
-        fileChannel.write(buffer);
-      }
-    } catch (IOException e) {
-      throw new CarbonDataWriterException("Problem while getting the file 
channel size", e);
-    }
-    // write data to file and get its offset
-    writeDataToFile(holder, dataChunkByteArray, fileChannel);
-    // add blocklet info to list
-    blockletInfoList.add(blockletInfo);
-    LOGGER.info("A new blocklet is added, its data size is: " + 
blockletDataSize + " Byte");
-  }
-
-  /**
-   * Below method will be used to write the data to file
-   * Data Format
-   * <DColumn1DataChunk><DColumnDataPage><DColumnRle>
-   * <DColumn2DataChunk><DColumn2DataPage><DColumn2RowIds><DColumn2Rle>
-   * <DColumn3DataChunk><DColumn3DataPage><column3RowIds>
-   * <MColumn1DataChunk><MColumn1DataPage>
-   * <MColumn2DataChunk><MColumn2DataPage>
-   * <MColumn2DataChunk><MColumn2DataPage>
-   *
-   * @param nodeHolder
-   * @param dataChunksBytes
-   * @param channel
-   * @throws CarbonDataWriterException
-   */
-  private void writeDataToFile(NodeHolder nodeHolder, byte[][] 
dataChunksBytes, FileChannel channel)
-      throws CarbonDataWriterException {
-    long offset = 0;
-    try {
-      offset = channel.size();
-    } catch (IOException e) {
-      throw new CarbonDataWriterException("Problem while getting the file 
channel size");
-    }
-    List<Long> currentDataChunksOffset = new ArrayList<>();
-    List<Short> currentDataChunksLength = new ArrayList<>();
-    dataChunksLength.add(currentDataChunksLength);
-    dataChunksOffsets.add(currentDataChunksOffset);
-    int bufferSize = 0;
-    int rowIdIndex = 0;
-    int rleIndex = 0;
-    for (int i = 0; i < nodeHolder.getIsSortedKeyBlock().length; i++) {
-      currentDataChunksOffset.add(offset);
-      currentDataChunksLength.add((short) dataChunksBytes[i].length);
-      bufferSize += dataChunksBytes[i].length + nodeHolder.getKeyLengths()[i] 
+ (!nodeHolder
-          .getIsSortedKeyBlock()[i] ? 
nodeHolder.getKeyBlockIndexLength()[rowIdIndex] : 0) + (
-          dataWriterVo.getAggBlocks()[i] ?
-              nodeHolder.getCompressedDataIndex()[rleIndex].length :
-              0);
-      offset += dataChunksBytes[i].length;
-      offset += nodeHolder.getKeyLengths()[i];
-      if (!nodeHolder.getIsSortedKeyBlock()[i]) {
-        offset += nodeHolder.getKeyBlockIndexLength()[rowIdIndex];
-        rowIdIndex++;
-      }
-      if (dataWriterVo.getAggBlocks()[i]) {
-        offset += nodeHolder.getDataIndexMapLength()[rleIndex];
-        rleIndex++;
-      }
-    }
-    ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
-    rleIndex = 0;
-    rowIdIndex = 0;
-    for (int i = 0; i < nodeHolder.getIsSortedKeyBlock().length; i++) {
-      buffer.put(dataChunksBytes[i]);
-      buffer.put(nodeHolder.getKeyArray()[i]);
-      if (!nodeHolder.getIsSortedKeyBlock()[i]) {
-        buffer.putInt(nodeHolder.getCompressedIndex()[rowIdIndex].length);
-        buffer.put(nodeHolder.getCompressedIndex()[rowIdIndex]);
-        if (nodeHolder.getCompressedIndexMap()[rowIdIndex].length > 0) {
-          buffer.put(nodeHolder.getCompressedIndexMap()[rowIdIndex]);
-        }
-        rowIdIndex++;
-      }
-      if (dataWriterVo.getAggBlocks()[i]) {
-        buffer.put(nodeHolder.getCompressedDataIndex()[rleIndex]);
-        rleIndex++;
-      }
-    }
-    try {
-      buffer.flip();
-      channel.write(buffer);
-    } catch (IOException e) {
-      throw new CarbonDataWriterException(
-          "Problem while writing the dimension data in carbon data file", e);
-    }
-
-    int dataChunkIndex = nodeHolder.getKeyArray().length;
-    int totalLength = 0;
-    for (int i = 0; i < nodeHolder.getDataArray().length; i++) {
-      currentDataChunksOffset.add(offset);
-      currentDataChunksLength.add((short) 
dataChunksBytes[dataChunkIndex].length);
-      offset += dataChunksBytes[dataChunkIndex].length;
-      offset += nodeHolder.getDataArray()[i].length;
-      totalLength += dataChunksBytes[dataChunkIndex].length;
-      totalLength += nodeHolder.getDataArray()[i].length;
-      dataChunkIndex++;
-    }
-    buffer = ByteBuffer.allocate(totalLength);
-    dataChunkIndex = nodeHolder.getKeyArray().length;
-    for (int i = 0; i < nodeHolder.getDataArray().length; i++) {
-      buffer.put(dataChunksBytes[dataChunkIndex++]);
-      buffer.put(nodeHolder.getDataArray()[i]);
-    }
-    try {
-      buffer.flip();
-      channel.write(buffer);
-    } catch (IOException e) {
-      throw new CarbonDataWriterException(
-          "Problem while writing the measure data in carbon data file", e);
-    }
-  }
-
-  /**
-   * This method will be used to get the blocklet metadata
-   *
-   * @return BlockletInfo - blocklet metadata
-   */
-  protected BlockletInfoColumnar getBlockletInfo(NodeHolder nodeHolder, long 
offset) {
-    // create the info object for leaf entry
-    BlockletInfoColumnar info = new BlockletInfoColumnar();
-    //add aggBlocks array
-    info.setAggKeyBlock(nodeHolder.getAggBlocks());
-    // add total entry count
-    info.setNumberOfKeys(nodeHolder.getEntryCount());
-
-    // add the key array length
-    info.setKeyLengths(nodeHolder.getKeyLengths());
-    // adding null measure index bit set
-    info.setMeasureNullValueIndex(nodeHolder.getMeasureNullValueIndex());
-    //add column min max length
-    info.setColumnMaxData(nodeHolder.getColumnMaxData());
-    info.setColumnMinData(nodeHolder.getColumnMinData());
-
-    // add measure length
-    info.setMeasureLength(nodeHolder.getMeasureLenght());
-
-    info.setIsSortedKeyColumn(nodeHolder.getIsSortedKeyBlock());
-    info.setKeyBlockIndexLength(nodeHolder.getKeyBlockIndexLength());
-    info.setDataIndexMapLength(nodeHolder.getDataIndexMapLength());
-    // set startkey
-    info.setStartKey(nodeHolder.getStartKey());
-    // set end key
-    info.setEndKey(nodeHolder.getEndKey());
-    info.setCompressionModel(nodeHolder.getCompressionModel());
-    // return leaf metadata
-
-    //colGroup Blocks
-    info.setColGrpBlocks(nodeHolder.getColGrpBlocks());
-
-    return info;
-  }
-
-  /**
-   * This method will write metadata at the end of file file format in thrift 
format
-   */
-  protected void writeBlockletInfoToFile(List<BlockletInfoColumnar> infoList, 
FileChannel channel,
-      String filePath) throws CarbonDataWriterException {
-    try {
-      // get the current file position
-      long currentPosition = channel.size();
-      CarbonFooterWriter writer = new CarbonFooterWriter(filePath);
-      // get thrift file footer instance
-      FileFooter convertFileMeta = CarbonMetadataUtil
-          .convertFilterFooter2(infoList, localCardinality, 
thriftColumnSchemaList,
-              dataChunksOffsets, dataChunksLength);
-      // fill the carbon index details
-      fillBlockIndexInfoDetails(infoList, convertFileMeta.getNum_rows(), 
filePath, currentPosition);
-      // write the footer
-      writer.writeFooter(convertFileMeta, currentPosition);
-    } catch (IOException e) {
-      throw new CarbonDataWriterException("Problem while writing the carbon 
file: ", e);
-    }
-  }
-}

Reply via email to