Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 90bc36699 -> 5406cee1b


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImplForIntIndexAndAggBlock.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImplForIntIndexAndAggBlock.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImplForIntIndexAndAggBlock.java
deleted file mode 100644
index 8c2608b..0000000
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImplForIntIndexAndAggBlock.java
+++ /dev/null
@@ -1,379 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.processing.store.writer;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.columnar.IndexStorage;
-import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressionModel;
-import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
-import org.apache.carbondata.core.util.CarbonMetadataUtil;
-import org.apache.carbondata.core.writer.CarbonFooterWriter;
-import org.apache.carbondata.format.FileFooter;
-import org.apache.carbondata.processing.store.colgroup.ColGroupBlockStorage;
-import 
org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
-
-public class CarbonFactDataWriterImplForIntIndexAndAggBlock extends 
AbstractFactDataWriter<int[]> {
-
-  private static final LogService LOGGER = LogServiceFactory
-      
.getLogService(CarbonFactDataWriterImplForIntIndexAndAggBlock.class.getName());
-
-  public CarbonFactDataWriterImplForIntIndexAndAggBlock(CarbonDataWriterVo 
dataWriterVo) {
-    super(dataWriterVo);
-  }
-
-  @Override
-  public NodeHolder buildDataNodeHolder(IndexStorage<int[]>[] keyStorageArray, 
byte[][] dataArray,
-      int entryCount, byte[] startKey, byte[] endKey, ValueCompressionModel 
compressionModel,
-      byte[] noDictionaryStartKey, byte[] noDictionaryEndKey) throws 
CarbonDataWriterException {
-    // if there are no NO-Dictionary column present in the table then
-    // set the empty byte array
-    if (null == noDictionaryEndKey) {
-      noDictionaryEndKey = new byte[0];
-    }
-    if (null == noDictionaryStartKey) {
-      noDictionaryStartKey = new byte[0];
-    }
-    // total measure length;
-    int totalMsrArrySize = 0;
-    // current measure length;
-    int currentMsrLenght = 0;
-    int totalKeySize = 0;
-    int keyBlockSize = 0;
-
-    boolean[] isSortedData = new boolean[keyStorageArray.length];
-    int[] keyLengths = new int[keyStorageArray.length];
-
-    //below will calculate min and max value for each column
-    //for below 2d array, first index will be for column and second will be 
min max
-    // value for same column
-    //    byte[][] columnMinMaxData = new byte[keyStorageArray.length][];
-
-    byte[][] allMinValue = new byte[keyStorageArray.length][];
-    byte[][] allMaxValue = new byte[keyStorageArray.length][];
-    byte[][] keyBlockData = fillAndCompressedKeyBlockData(keyStorageArray, 
entryCount);
-    boolean[] colGrpBlock = new boolean[keyStorageArray.length];
-
-    for (int i = 0; i < keyLengths.length; i++) {
-      keyLengths[i] = keyBlockData[i].length;
-      isSortedData[i] = keyStorageArray[i].isAlreadySorted();
-      if (!isSortedData[i]) {
-        keyBlockSize++;
-
-      }
-      totalKeySize += keyLengths[i];
-      if (dataWriterVo.getIsComplexType()[i] || 
dataWriterVo.getIsDictionaryColumn()[i]) {
-        allMinValue[i] = keyStorageArray[i].getMin();
-        allMaxValue[i] = keyStorageArray[i].getMax();
-      } else {
-        allMinValue[i] = 
updateMinMaxForNoDictionary(keyStorageArray[i].getMin());
-        allMaxValue[i] = 
updateMinMaxForNoDictionary(keyStorageArray[i].getMax());
-      }
-      //if keyStorageArray is instance of ColGroupBlockStorage than it's 
colGroup chunk
-      if (keyStorageArray[i] instanceof ColGroupBlockStorage) {
-        colGrpBlock[i] = true;
-      }
-    }
-    int[] keyBlockIdxLengths = new int[keyBlockSize];
-    byte[][] dataAfterCompression = new byte[keyBlockSize][];
-    byte[][] indexMap = new byte[keyBlockSize][];
-    int idx = 0;
-    for (int i = 0; i < isSortedData.length; i++) {
-      if (!isSortedData[i]) {
-        dataAfterCompression[idx] =
-            numberCompressor.compress(keyStorageArray[i].getDataAfterComp());
-        if (null != keyStorageArray[i].getIndexMap()
-            && keyStorageArray[i].getIndexMap().length > 0) {
-          indexMap[idx] = 
numberCompressor.compress(keyStorageArray[i].getIndexMap());
-        } else {
-          indexMap[idx] = new byte[0];
-        }
-        keyBlockIdxLengths[idx] = (dataAfterCompression[idx].length + 
indexMap[idx].length)
-            + CarbonCommonConstants.INT_SIZE_IN_BYTE;
-        idx++;
-      }
-    }
-    int compressDataBlockSize = 0;
-    for (int i = 0; i < dataWriterVo.getAggBlocks().length; i++) {
-      if (dataWriterVo.getAggBlocks()[i]) {
-        compressDataBlockSize++;
-      }
-    }
-    byte[][] compressedDataIndex = new byte[compressDataBlockSize][];
-    int[] dataIndexMapLength = new int[compressDataBlockSize];
-    idx = 0;
-    for (int i = 0; i < dataWriterVo.getAggBlocks().length; i++) {
-      if (dataWriterVo.getAggBlocks()[i]) {
-        try {
-          compressedDataIndex[idx] =
-              numberCompressor.compress(keyStorageArray[i].getDataIndexMap());
-          dataIndexMapLength[idx] = compressedDataIndex[idx].length;
-          idx++;
-        } catch (Exception e) {
-          throw new CarbonDataWriterException(e.getMessage());
-        }
-      }
-    }
-
-    int[] msrLength = new int[dataWriterVo.getMeasureCount()];
-    // calculate the total size required for all the measure and get the
-    // each measure size
-    for (int i = 0; i < dataArray.length; i++) {
-      currentMsrLenght = dataArray[i].length;
-      totalMsrArrySize += currentMsrLenght;
-      msrLength[i] = currentMsrLenght;
-    }
-    NodeHolder holder = new NodeHolder();
-    holder.setDataArray(dataArray);
-    holder.setKeyArray(keyBlockData);
-    // end key format will be <length of dictionary key><length of no
-    // dictionary key><DictionaryKey><No Dictionary key>
-    byte[] updatedNoDictionaryEndKey = 
updateNoDictionaryStartAndEndKey(noDictionaryEndKey);
-    ByteBuffer buffer = ByteBuffer.allocate(
-        CarbonCommonConstants.INT_SIZE_IN_BYTE + 
CarbonCommonConstants.INT_SIZE_IN_BYTE
-            + endKey.length + updatedNoDictionaryEndKey.length);
-    buffer.putInt(endKey.length);
-    buffer.putInt(updatedNoDictionaryEndKey.length);
-    buffer.put(endKey);
-    buffer.put(updatedNoDictionaryEndKey);
-    buffer.rewind();
-    holder.setEndKey(buffer.array());
-    holder.setMeasureLenght(msrLength);
-    byte[] updatedNoDictionaryStartKey = 
updateNoDictionaryStartAndEndKey(noDictionaryStartKey);
-    // start key format will be <length of dictionary key><length of no
-    // dictionary key><DictionaryKey><No Dictionary key>
-    buffer = ByteBuffer.allocate(
-        CarbonCommonConstants.INT_SIZE_IN_BYTE + 
CarbonCommonConstants.INT_SIZE_IN_BYTE
-            + startKey.length + updatedNoDictionaryStartKey.length);
-    buffer.putInt(startKey.length);
-    buffer.putInt(updatedNoDictionaryStartKey.length);
-    buffer.put(startKey);
-    buffer.put(updatedNoDictionaryStartKey);
-    buffer.rewind();
-    holder.setStartKey(buffer.array());
-    holder.setEntryCount(entryCount);
-    holder.setKeyLengths(keyLengths);
-    holder.setKeyBlockIndexLength(keyBlockIdxLengths);
-    holder.setIsSortedKeyBlock(isSortedData);
-    holder.setCompressedIndex(dataAfterCompression);
-    holder.setCompressedIndexMap(indexMap);
-    holder.setDataIndexMapLength(dataIndexMapLength);
-    holder.setCompressedDataIndex(compressedDataIndex);
-    holder.setCompressionModel(compressionModel);
-    holder.setTotalDimensionArrayLength(totalKeySize);
-    holder.setTotalMeasureArrayLength(totalMsrArrySize);
-    //setting column min max value
-    holder.setColumnMaxData(allMaxValue);
-    holder.setColumnMinData(allMinValue);
-    holder.setAggBlocks(dataWriterVo.getAggBlocks());
-    holder.setColGrpBlocks(colGrpBlock);
-    return holder;
-  }
-
-  @Override public void writeBlockletData(NodeHolder holder) throws 
CarbonDataWriterException {
-    int indexBlockSize = 0;
-    for (int i = 0; i < holder.getKeyBlockIndexLength().length; i++) {
-      indexBlockSize += holder.getKeyBlockIndexLength()[i] + 
CarbonCommonConstants.INT_SIZE_IN_BYTE;
-    }
-
-    for (int i = 0; i < holder.getDataIndexMapLength().length; i++) {
-      indexBlockSize += holder.getDataIndexMapLength()[i];
-    }
-
-    long blockletDataSize =
-        holder.getTotalDimensionArrayLength() + 
holder.getTotalMeasureArrayLength()
-            + indexBlockSize;
-    updateBlockletFileChannel(blockletDataSize);
-    // write data to file and get its offset
-    long offset = writeDataToFile(holder, fileChannel);
-    // get the blocklet info for currently added blocklet
-    BlockletInfoColumnar blockletInfo = getBlockletInfo(holder, offset);
-    // add blocklet info to list
-    blockletInfoList.add(blockletInfo);
-    LOGGER.info("A new blocklet is added, its data size is: " + 
blockletDataSize + " Byte");
-  }
-
-  /**
-   * This method is responsible for writing blocklet to the data file
-   *
-   * @return file offset offset is the current position of the file
-   * @throws CarbonDataWriterException if will throw CarbonDataWriterException 
when any thing
-   *                                   goes wrong while while writing the leaf 
file
-   */
-  private long writeDataToFile(NodeHolder nodeHolder, FileChannel channel)
-      throws CarbonDataWriterException {
-    // create byte buffer
-    byte[][] compressedIndex = nodeHolder.getCompressedIndex();
-    byte[][] compressedIndexMap = nodeHolder.getCompressedIndexMap();
-    byte[][] compressedDataIndex = nodeHolder.getCompressedDataIndex();
-    int indexBlockSize = 0;
-    int index = 0;
-    for (int i = 0; i < nodeHolder.getKeyBlockIndexLength().length; i++) {
-      indexBlockSize +=
-          nodeHolder.getKeyBlockIndexLength()[index++] + 
CarbonCommonConstants.INT_SIZE_IN_BYTE;
-    }
-
-    for (int i = 0; i < nodeHolder.getDataIndexMapLength().length; i++) {
-      indexBlockSize += nodeHolder.getDataIndexMapLength()[i];
-    }
-    ByteBuffer byteBuffer = ByteBuffer.allocate(
-        nodeHolder.getTotalDimensionArrayLength() + 
nodeHolder.getTotalMeasureArrayLength()
-            + indexBlockSize);
-    long offset = 0;
-    try {
-      // get the current offset
-      offset = channel.size();
-      // add key array to byte buffer
-      for (int i = 0; i < nodeHolder.getKeyArray().length; i++) {
-        byteBuffer.put(nodeHolder.getKeyArray()[i]);
-      }
-      for (int i = 0; i < nodeHolder.getDataArray().length; i++) {
-        byteBuffer.put(nodeHolder.getDataArray()[i]);
-      }
-      // add measure data array to byte buffer
-
-      ByteBuffer buffer1 = null;
-      for (int i = 0; i < compressedIndex.length; i++) {
-        buffer1 = ByteBuffer.allocate(nodeHolder.getKeyBlockIndexLength()[i]);
-        buffer1.putInt(compressedIndex[i].length);
-        buffer1.put(compressedIndex[i]);
-        if (compressedIndexMap[i].length > 0) {
-          buffer1.put(compressedIndexMap[i]);
-        }
-        buffer1.rewind();
-        byteBuffer.put(buffer1.array());
-
-      }
-      for (int i = 0; i < compressedDataIndex.length; i++) {
-        byteBuffer.put(compressedDataIndex[i]);
-      }
-      byteBuffer.flip();
-      // write data to file
-      channel.write(byteBuffer);
-    } catch (IOException exception) {
-      throw new CarbonDataWriterException("Problem in writing carbon file: ", 
exception);
-    }
-    // return the offset, this offset will be used while reading the file in
-    // engine side to get from which position to start reading the file
-    return offset;
-  }
-
-  /**
-   * This method will be used to get the blocklet metadata
-   *
-   * @return BlockletInfo - blocklet metadata
-   */
-  protected BlockletInfoColumnar getBlockletInfo(NodeHolder nodeHolder, long 
offset) {
-    // create the info object for leaf entry
-    BlockletInfoColumnar info = new BlockletInfoColumnar();
-    //add aggBlocks array
-    info.setAggKeyBlock(nodeHolder.getAggBlocks());
-    // add total entry count
-    info.setNumberOfKeys(nodeHolder.getEntryCount());
-
-    // add the key array length
-    info.setKeyLengths(nodeHolder.getKeyLengths());
-    // adding null measure index bit set
-    info.setMeasureNullValueIndex(nodeHolder.getMeasureNullValueIndex());
-    //add column min max length
-    info.setColumnMaxData(nodeHolder.getColumnMaxData());
-    info.setColumnMinData(nodeHolder.getColumnMinData());
-    long[] keyOffSets = new long[nodeHolder.getKeyLengths().length];
-
-    for (int i = 0; i < keyOffSets.length; i++) {
-      keyOffSets[i] = offset;
-      offset += nodeHolder.getKeyLengths()[i];
-    }
-    // key offset will be 8 bytes from current offset because first 4 bytes
-    // will be for number of entry in leaf, then next 4 bytes will be for
-    // key lenght;
-    //        offset += CarbonCommonConstants.INT_SIZE_IN_BYTE * 2;
-
-    // add key offset
-    info.setKeyOffSets(keyOffSets);
-
-    // add measure length
-    info.setMeasureLength(nodeHolder.getMeasureLenght());
-
-    long[] msrOffset = new long[dataWriterVo.getMeasureCount()];
-
-    for (int i = 0; i < msrOffset.length; i++) {
-      // increment the current offset by 4 bytes because 4 bytes will be
-      // used for measure byte length
-      //            offset += CarbonCommonConstants.INT_SIZE_IN_BYTE;
-      msrOffset[i] = offset;
-      // now increment the offset by adding measure length to get the next
-      // measure offset;
-      offset += nodeHolder.getMeasureLenght()[i];
-    }
-    // add measure offset
-    info.setMeasureOffset(msrOffset);
-    info.setIsSortedKeyColumn(nodeHolder.getIsSortedKeyBlock());
-    info.setKeyBlockIndexLength(nodeHolder.getKeyBlockIndexLength());
-    long[] keyBlockIndexOffsets = new 
long[nodeHolder.getKeyBlockIndexLength().length];
-    for (int i = 0; i < keyBlockIndexOffsets.length; i++) {
-      keyBlockIndexOffsets[i] = offset;
-      offset += nodeHolder.getKeyBlockIndexLength()[i];
-    }
-    info.setDataIndexMapLength(nodeHolder.getDataIndexMapLength());
-    long[] dataIndexMapOffsets = new 
long[nodeHolder.getDataIndexMapLength().length];
-    for (int i = 0; i < dataIndexMapOffsets.length; i++) {
-      dataIndexMapOffsets[i] = offset;
-      offset += nodeHolder.getDataIndexMapLength()[i];
-    }
-    info.setDataIndexMapOffsets(dataIndexMapOffsets);
-    info.setKeyBlockIndexOffSets(keyBlockIndexOffsets);
-    // set startkey
-    info.setStartKey(nodeHolder.getStartKey());
-    // set end key
-    info.setEndKey(nodeHolder.getEndKey());
-    info.setCompressionModel(nodeHolder.getCompressionModel());
-    // return leaf metadata
-
-    //colGroup Blocks
-    info.setColGrpBlocks(nodeHolder.getColGrpBlocks());
-
-    return info;
-  }
-
-  /**
-   * This method will write metadata at the end of file file format in thrift 
format
-   */
-  protected void writeBlockletInfoToFile(List<BlockletInfoColumnar> infoList, 
FileChannel channel,
-      String filePath) throws CarbonDataWriterException {
-    try {
-      long currentPosition = channel.size();
-      CarbonFooterWriter writer = new CarbonFooterWriter(filePath);
-      FileFooter convertFileMeta = CarbonMetadataUtil
-          .convertFileFooter(infoList, localCardinality.length, 
localCardinality,
-              thriftColumnSchemaList, dataWriterVo.getSegmentProperties());
-      fillBlockIndexInfoDetails(infoList, convertFileMeta.getNum_rows(), 
filePath, currentPosition);
-      writer.writeFooter(convertFileMeta, currentPosition);
-    } catch (IOException e) {
-      throw new CarbonDataWriterException("Problem while writing the carbon 
file: ", e);
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
new file mode 100644
index 0000000..19e781d
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.processing.store.writer.v1;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.columnar.IndexStorage;
+import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressionModel;
+import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.writer.CarbonFooterWriter;
+import org.apache.carbondata.format.FileFooter;
+import org.apache.carbondata.processing.store.colgroup.ColGroupBlockStorage;
+import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter;
+import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
+import org.apache.carbondata.processing.store.writer.NodeHolder;
+import 
org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
+
+public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> {
+
+  private static final LogService LOGGER = LogServiceFactory
+      .getLogService(CarbonFactDataWriterImplV1.class.getName());
+
+  public CarbonFactDataWriterImplV1(CarbonDataWriterVo dataWriterVo) {
+    super(dataWriterVo);
+  }
+
+  @Override
+  public NodeHolder buildDataNodeHolder(IndexStorage<int[]>[] keyStorageArray, 
byte[][] dataArray,
+      int entryCount, byte[] startKey, byte[] endKey, ValueCompressionModel 
compressionModel,
+      byte[] noDictionaryStartKey, byte[] noDictionaryEndKey) throws 
CarbonDataWriterException {
+    // if there are no NO-Dictionary column present in the table then
+    // set the empty byte array
+    if (null == noDictionaryEndKey) {
+      noDictionaryEndKey = new byte[0];
+    }
+    if (null == noDictionaryStartKey) {
+      noDictionaryStartKey = new byte[0];
+    }
+    // total measure length;
+    int totalMsrArrySize = 0;
+    // current measure length;
+    int currentMsrLenght = 0;
+    int totalKeySize = 0;
+    int keyBlockSize = 0;
+
+    boolean[] isSortedData = new boolean[keyStorageArray.length];
+    int[] keyLengths = new int[keyStorageArray.length];
+
+    //below will calculate min and max value for each column
+    //for below 2d array, first index will be for column and second will be 
min max
+    // value for same column
+    //    byte[][] columnMinMaxData = new byte[keyStorageArray.length][];
+
+    byte[][] allMinValue = new byte[keyStorageArray.length][];
+    byte[][] allMaxValue = new byte[keyStorageArray.length][];
+    byte[][] keyBlockData = fillAndCompressedKeyBlockData(keyStorageArray, 
entryCount);
+    boolean[] colGrpBlock = new boolean[keyStorageArray.length];
+
+    for (int i = 0; i < keyLengths.length; i++) {
+      keyLengths[i] = keyBlockData[i].length;
+      isSortedData[i] = keyStorageArray[i].isAlreadySorted();
+      if (!isSortedData[i]) {
+        keyBlockSize++;
+
+      }
+      totalKeySize += keyLengths[i];
+      if (dataWriterVo.getIsComplexType()[i] || 
dataWriterVo.getIsDictionaryColumn()[i]) {
+        allMinValue[i] = keyStorageArray[i].getMin();
+        allMaxValue[i] = keyStorageArray[i].getMax();
+      } else {
+        allMinValue[i] = 
updateMinMaxForNoDictionary(keyStorageArray[i].getMin());
+        allMaxValue[i] = 
updateMinMaxForNoDictionary(keyStorageArray[i].getMax());
+      }
+      //if keyStorageArray is instance of ColGroupBlockStorage than it's 
colGroup chunk
+      if (keyStorageArray[i] instanceof ColGroupBlockStorage) {
+        colGrpBlock[i] = true;
+      }
+    }
+    int[] keyBlockIdxLengths = new int[keyBlockSize];
+    byte[][] dataAfterCompression = new byte[keyBlockSize][];
+    byte[][] indexMap = new byte[keyBlockSize][];
+    int idx = 0;
+    for (int i = 0; i < isSortedData.length; i++) {
+      if (!isSortedData[i]) {
+        dataAfterCompression[idx] =
+            numberCompressor.compress(keyStorageArray[i].getDataAfterComp());
+        if (null != keyStorageArray[i].getIndexMap()
+            && keyStorageArray[i].getIndexMap().length > 0) {
+          indexMap[idx] = 
numberCompressor.compress(keyStorageArray[i].getIndexMap());
+        } else {
+          indexMap[idx] = new byte[0];
+        }
+        keyBlockIdxLengths[idx] = (dataAfterCompression[idx].length + 
indexMap[idx].length)
+            + CarbonCommonConstants.INT_SIZE_IN_BYTE;
+        idx++;
+      }
+    }
+    int compressDataBlockSize = 0;
+    for (int i = 0; i < dataWriterVo.getAggBlocks().length; i++) {
+      if (dataWriterVo.getAggBlocks()[i]) {
+        compressDataBlockSize++;
+      }
+    }
+    byte[][] compressedDataIndex = new byte[compressDataBlockSize][];
+    int[] dataIndexMapLength = new int[compressDataBlockSize];
+    idx = 0;
+    for (int i = 0; i < dataWriterVo.getAggBlocks().length; i++) {
+      if (dataWriterVo.getAggBlocks()[i]) {
+        try {
+          compressedDataIndex[idx] =
+              numberCompressor.compress(keyStorageArray[i].getDataIndexMap());
+          dataIndexMapLength[idx] = compressedDataIndex[idx].length;
+          idx++;
+        } catch (Exception e) {
+          throw new CarbonDataWriterException(e.getMessage());
+        }
+      }
+    }
+
+    int[] msrLength = new int[dataWriterVo.getMeasureCount()];
+    // calculate the total size required for all the measure and get the
+    // each measure size
+    for (int i = 0; i < dataArray.length; i++) {
+      currentMsrLenght = dataArray[i].length;
+      totalMsrArrySize += currentMsrLenght;
+      msrLength[i] = currentMsrLenght;
+    }
+    NodeHolder holder = new NodeHolder();
+    holder.setDataArray(dataArray);
+    holder.setKeyArray(keyBlockData);
+    // end key format will be <length of dictionary key><length of no
+    // dictionary key><DictionaryKey><No Dictionary key>
+    byte[] updatedNoDictionaryEndKey = 
updateNoDictionaryStartAndEndKey(noDictionaryEndKey);
+    ByteBuffer buffer = ByteBuffer.allocate(
+        CarbonCommonConstants.INT_SIZE_IN_BYTE + 
CarbonCommonConstants.INT_SIZE_IN_BYTE
+            + endKey.length + updatedNoDictionaryEndKey.length);
+    buffer.putInt(endKey.length);
+    buffer.putInt(updatedNoDictionaryEndKey.length);
+    buffer.put(endKey);
+    buffer.put(updatedNoDictionaryEndKey);
+    buffer.rewind();
+    holder.setEndKey(buffer.array());
+    holder.setMeasureLenght(msrLength);
+    byte[] updatedNoDictionaryStartKey = 
updateNoDictionaryStartAndEndKey(noDictionaryStartKey);
+    // start key format will be <length of dictionary key><length of no
+    // dictionary key><DictionaryKey><No Dictionary key>
+    buffer = ByteBuffer.allocate(
+        CarbonCommonConstants.INT_SIZE_IN_BYTE + 
CarbonCommonConstants.INT_SIZE_IN_BYTE
+            + startKey.length + updatedNoDictionaryStartKey.length);
+    buffer.putInt(startKey.length);
+    buffer.putInt(updatedNoDictionaryStartKey.length);
+    buffer.put(startKey);
+    buffer.put(updatedNoDictionaryStartKey);
+    buffer.rewind();
+    holder.setStartKey(buffer.array());
+    holder.setEntryCount(entryCount);
+    holder.setKeyLengths(keyLengths);
+    holder.setKeyBlockIndexLength(keyBlockIdxLengths);
+    holder.setIsSortedKeyBlock(isSortedData);
+    holder.setCompressedIndex(dataAfterCompression);
+    holder.setCompressedIndexMap(indexMap);
+    holder.setDataIndexMapLength(dataIndexMapLength);
+    holder.setCompressedDataIndex(compressedDataIndex);
+    holder.setCompressionModel(compressionModel);
+    holder.setTotalDimensionArrayLength(totalKeySize);
+    holder.setTotalMeasureArrayLength(totalMsrArrySize);
+    //setting column min max value
+    holder.setColumnMaxData(allMaxValue);
+    holder.setColumnMinData(allMinValue);
+    holder.setAggBlocks(dataWriterVo.getAggBlocks());
+    holder.setColGrpBlocks(colGrpBlock);
+    return holder;
+  }
+
+  @Override public void writeBlockletData(NodeHolder holder) throws 
CarbonDataWriterException {
+    int indexBlockSize = 0;
+    for (int i = 0; i < holder.getKeyBlockIndexLength().length; i++) {
+      indexBlockSize += holder.getKeyBlockIndexLength()[i] + 
CarbonCommonConstants.INT_SIZE_IN_BYTE;
+    }
+
+    for (int i = 0; i < holder.getDataIndexMapLength().length; i++) {
+      indexBlockSize += holder.getDataIndexMapLength()[i];
+    }
+
+    long blockletDataSize =
+        holder.getTotalDimensionArrayLength() + 
holder.getTotalMeasureArrayLength()
+            + indexBlockSize;
+    updateBlockletFileChannel(blockletDataSize);
+    // write data to file and get its offset
+    long offset = writeDataToFile(holder, fileChannel);
+    // get the blocklet info for currently added blocklet
+    BlockletInfoColumnar blockletInfo = getBlockletInfo(holder, offset);
+    // add blocklet info to list
+    blockletInfoList.add(blockletInfo);
+    LOGGER.info("A new blocklet is added, its data size is: " + 
blockletDataSize + " Byte");
+  }
+
+  /**
+   * This method is responsible for writing blocklet to the data file
+   *
+   * @return file offset offset is the current position of the file
+   * @throws CarbonDataWriterException if will throw CarbonDataWriterException 
when any thing
+   *                                   goes wrong while while writing the leaf 
file
+   */
+  private long writeDataToFile(NodeHolder nodeHolder, FileChannel channel)
+      throws CarbonDataWriterException {
+    // create byte buffer
+    byte[][] compressedIndex = nodeHolder.getCompressedIndex();
+    byte[][] compressedIndexMap = nodeHolder.getCompressedIndexMap();
+    byte[][] compressedDataIndex = nodeHolder.getCompressedDataIndex();
+    int indexBlockSize = 0;
+    int index = 0;
+    for (int i = 0; i < nodeHolder.getKeyBlockIndexLength().length; i++) {
+      indexBlockSize +=
+          nodeHolder.getKeyBlockIndexLength()[index++] + 
CarbonCommonConstants.INT_SIZE_IN_BYTE;
+    }
+
+    for (int i = 0; i < nodeHolder.getDataIndexMapLength().length; i++) {
+      indexBlockSize += nodeHolder.getDataIndexMapLength()[i];
+    }
+    ByteBuffer byteBuffer = ByteBuffer.allocate(
+        nodeHolder.getTotalDimensionArrayLength() + 
nodeHolder.getTotalMeasureArrayLength()
+            + indexBlockSize);
+    long offset = 0;
+    try {
+      // get the current offset
+      offset = channel.size();
+      // add key array to byte buffer
+      for (int i = 0; i < nodeHolder.getKeyArray().length; i++) {
+        byteBuffer.put(nodeHolder.getKeyArray()[i]);
+      }
+      for (int i = 0; i < nodeHolder.getDataArray().length; i++) {
+        byteBuffer.put(nodeHolder.getDataArray()[i]);
+      }
+      // add measure data array to byte buffer
+
+      ByteBuffer buffer1 = null;
+      for (int i = 0; i < compressedIndex.length; i++) {
+        buffer1 = ByteBuffer.allocate(nodeHolder.getKeyBlockIndexLength()[i]);
+        buffer1.putInt(compressedIndex[i].length);
+        buffer1.put(compressedIndex[i]);
+        if (compressedIndexMap[i].length > 0) {
+          buffer1.put(compressedIndexMap[i]);
+        }
+        buffer1.rewind();
+        byteBuffer.put(buffer1.array());
+
+      }
+      for (int i = 0; i < compressedDataIndex.length; i++) {
+        byteBuffer.put(compressedDataIndex[i]);
+      }
+      byteBuffer.flip();
+      // write data to file
+      channel.write(byteBuffer);
+    } catch (IOException exception) {
+      throw new CarbonDataWriterException("Problem in writing carbon file: ", 
exception);
+    }
+    // return the offset, this offset will be used while reading the file in
+    // engine side to get from which position to start reading the file
+    return offset;
+  }
+
+  /**
+   * This method will be used to get the blocklet metadata
+   *
+   * @return BlockletInfo - blocklet metadata
+   */
+  protected BlockletInfoColumnar getBlockletInfo(NodeHolder nodeHolder, long 
offset) {
+    // create the info object for leaf entry
+    BlockletInfoColumnar info = new BlockletInfoColumnar();
+    //add aggBlocks array
+    info.setAggKeyBlock(nodeHolder.getAggBlocks());
+    // add total entry count
+    info.setNumberOfKeys(nodeHolder.getEntryCount());
+
+    // add the key array length
+    info.setKeyLengths(nodeHolder.getKeyLengths());
+    // adding null measure index bit set
+    info.setMeasureNullValueIndex(nodeHolder.getMeasureNullValueIndex());
+    //add column min max length
+    info.setColumnMaxData(nodeHolder.getColumnMaxData());
+    info.setColumnMinData(nodeHolder.getColumnMinData());
+    long[] keyOffSets = new long[nodeHolder.getKeyLengths().length];
+
+    for (int i = 0; i < keyOffSets.length; i++) {
+      keyOffSets[i] = offset;
+      offset += nodeHolder.getKeyLengths()[i];
+    }
+    // key offset will be 8 bytes from current offset because first 4 bytes
+    // will be for number of entry in leaf, then next 4 bytes will be for
+    // key lenght;
+    //        offset += CarbonCommonConstants.INT_SIZE_IN_BYTE * 2;
+
+    // add key offset
+    info.setKeyOffSets(keyOffSets);
+
+    // add measure length
+    info.setMeasureLength(nodeHolder.getMeasureLenght());
+
+    long[] msrOffset = new long[dataWriterVo.getMeasureCount()];
+
+    for (int i = 0; i < msrOffset.length; i++) {
+      // increment the current offset by 4 bytes because 4 bytes will be
+      // used for measure byte length
+      //            offset += CarbonCommonConstants.INT_SIZE_IN_BYTE;
+      msrOffset[i] = offset;
+      // now increment the offset by adding measure length to get the next
+      // measure offset;
+      offset += nodeHolder.getMeasureLenght()[i];
+    }
+    // add measure offset
+    info.setMeasureOffset(msrOffset);
+    info.setIsSortedKeyColumn(nodeHolder.getIsSortedKeyBlock());
+    info.setKeyBlockIndexLength(nodeHolder.getKeyBlockIndexLength());
+    long[] keyBlockIndexOffsets = new 
long[nodeHolder.getKeyBlockIndexLength().length];
+    for (int i = 0; i < keyBlockIndexOffsets.length; i++) {
+      keyBlockIndexOffsets[i] = offset;
+      offset += nodeHolder.getKeyBlockIndexLength()[i];
+    }
+    info.setDataIndexMapLength(nodeHolder.getDataIndexMapLength());
+    long[] dataIndexMapOffsets = new 
long[nodeHolder.getDataIndexMapLength().length];
+    for (int i = 0; i < dataIndexMapOffsets.length; i++) {
+      dataIndexMapOffsets[i] = offset;
+      offset += nodeHolder.getDataIndexMapLength()[i];
+    }
+    info.setDataIndexMapOffsets(dataIndexMapOffsets);
+    info.setKeyBlockIndexOffSets(keyBlockIndexOffsets);
+    // set startkey
+    info.setStartKey(nodeHolder.getStartKey());
+    // set end key
+    info.setEndKey(nodeHolder.getEndKey());
+    info.setCompressionModel(nodeHolder.getCompressionModel());
+    // return leaf metadata
+
+    //colGroup Blocks
+    info.setColGrpBlocks(nodeHolder.getColGrpBlocks());
+
+    return info;
+  }
+
+  /**
+   * This method will write metadata at the end of file file format in thrift 
format
+   */
+  protected void writeBlockletInfoToFile(List<BlockletInfoColumnar> infoList, 
FileChannel channel,
+      String filePath) throws CarbonDataWriterException {
+    try {
+      long currentPosition = channel.size();
+      CarbonFooterWriter writer = new CarbonFooterWriter(filePath);
+      FileFooter convertFileMeta = CarbonMetadataUtil
+          .convertFileFooter(infoList, localCardinality.length, 
localCardinality,
+              thriftColumnSchemaList, dataWriterVo.getSegmentProperties());
+      fillBlockIndexInfoDetails(infoList, convertFileMeta.getNum_rows(), 
filePath, currentPosition);
+      writer.writeFooter(convertFileMeta, currentPosition);
+    } catch (IOException e) {
+      throw new CarbonDataWriterException("Problem while writing the carbon 
file: ", e);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
new file mode 100644
index 0000000..f9f45cc
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.store.writer.v2;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.writer.CarbonFooterWriter;
+import org.apache.carbondata.format.DataChunk2;
+import org.apache.carbondata.format.FileFooter;
+import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
+import org.apache.carbondata.processing.store.writer.NodeHolder;
+import 
org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
+import 
org.apache.carbondata.processing.store.writer.v1.CarbonFactDataWriterImplV1;
+
+/**
+ * Below method will be used to write the data in version 2 format
+ */
+public class CarbonFactDataWriterImplV2 extends CarbonFactDataWriterImplV1 {
+
+  /**
+   * logger
+   */
+  private static final LogService LOGGER =
+      
LogServiceFactory.getLogService(CarbonFactDataWriterImplV2.class.getName());
+
+  /**
+   * Constructor create instance of this class
+   *
+   * @param dataWriterVo
+   */
+  public CarbonFactDataWriterImplV2(CarbonDataWriterVo dataWriterVo) {
+    super(dataWriterVo);
+  }
+
+  /**
+   * Below method will be used to write the data to carbon data file
+   *
+   * @param holder
+   * @throws CarbonDataWriterException any problem in writing operation
+   */
+  @Override public void writeBlockletData(NodeHolder holder) throws 
CarbonDataWriterException {
+    // size to calculate the size of the blocklet
+    int size = 0;
+    // get the blocklet info object
+    BlockletInfoColumnar blockletInfo = getBlockletInfo(holder, 0);
+
+    List<DataChunk2> datachunks = null;
+    try {
+      // get all the data chunks
+      datachunks = CarbonMetadataUtil
+          .getDatachunk2(blockletInfo, thriftColumnSchemaList, 
dataWriterVo.getSegmentProperties());
+    } catch (IOException e) {
+      throw new CarbonDataWriterException("Problem while getting the data 
chunks", e);
+    }
+    // data chunk byte array
+    byte[][] dataChunkByteArray = new byte[datachunks.size()][];
+    for (int i = 0; i < dataChunkByteArray.length; i++) {
+      dataChunkByteArray[i] = CarbonUtil.getByteArray(datachunks.get(i));
+      // add the data chunk size
+      size += dataChunkByteArray[i].length;
+    }
+    // add row id index length
+    for (int i = 0; i < holder.getKeyBlockIndexLength().length; i++) {
+      size += holder.getKeyBlockIndexLength()[i];
+    }
+    // add rle index length
+    for (int i = 0; i < holder.getDataIndexMapLength().length; i++) {
+      size += holder.getDataIndexMapLength()[i];
+    }
+    // add dimension column data page and measure column data page size
+    long blockletDataSize =
+        holder.getTotalDimensionArrayLength() + 
holder.getTotalMeasureArrayLength() + size;
+    // if size of the file already reached threshold size then create a new 
file and get the file
+    // channel object
+    updateBlockletFileChannel(blockletDataSize);
+    // writer the version header in the file if current file size is zero
+    // this is done so carbondata file can be read separately
+    try {
+      if (fileChannel.size() == 0) {
+        ColumnarFormatVersion version = 
CarbonProperties.getInstance().getFormatVersion();
+        byte[] header = (CarbonCommonConstants.CARBON_DATA_VERSION_HEADER + 
version).getBytes();
+        ByteBuffer buffer = ByteBuffer.allocate(header.length);
+        buffer.put(header);
+        buffer.rewind();
+        fileChannel.write(buffer);
+      }
+    } catch (IOException e) {
+      throw new CarbonDataWriterException("Problem while getting the file 
channel size", e);
+    }
+    // write data to file and get its offset
+    writeDataToFile(holder, dataChunkByteArray, fileChannel);
+    // add blocklet info to list
+    blockletInfoList.add(blockletInfo);
+    LOGGER.info("A new blocklet is added, its data size is: " + 
blockletDataSize + " Byte");
+  }
+
+  /**
+   * Below method will be used to write the data to file
+   * Data Format
+   * <DColumn1DataChunk><DColumnDataPage><DColumnRle>
+   * <DColumn2DataChunk><DColumn2DataPage><DColumn2RowIds><DColumn2Rle>
+   * <DColumn3DataChunk><DColumn3DataPage><column3RowIds>
+   * <MColumn1DataChunk><MColumn1DataPage>
+   * <MColumn2DataChunk><MColumn2DataPage>
+   * <MColumn2DataChunk><MColumn2DataPage>
+   *
+   * @param nodeHolder
+   * @param dataChunksBytes
+   * @param channel
+   * @throws CarbonDataWriterException
+   */
+  private void writeDataToFile(NodeHolder nodeHolder, byte[][] 
dataChunksBytes, FileChannel channel)
+      throws CarbonDataWriterException {
+    long offset = 0;
+    try {
+      offset = channel.size();
+    } catch (IOException e) {
+      throw new CarbonDataWriterException("Problem while getting the file 
channel size");
+    }
+    List<Long> currentDataChunksOffset = new ArrayList<>();
+    List<Short> currentDataChunksLength = new ArrayList<>();
+    dataChunksLength.add(currentDataChunksLength);
+    dataChunksOffsets.add(currentDataChunksOffset);
+    int bufferSize = 0;
+    int rowIdIndex = 0;
+    int rleIndex = 0;
+    for (int i = 0; i < nodeHolder.getIsSortedKeyBlock().length; i++) {
+      currentDataChunksOffset.add(offset);
+      currentDataChunksLength.add((short) dataChunksBytes[i].length);
+      bufferSize += dataChunksBytes[i].length + nodeHolder.getKeyLengths()[i] 
+ (!nodeHolder
+          .getIsSortedKeyBlock()[i] ? 
nodeHolder.getKeyBlockIndexLength()[rowIdIndex] : 0) + (
+          dataWriterVo.getAggBlocks()[i] ?
+              nodeHolder.getCompressedDataIndex()[rleIndex].length :
+              0);
+      offset += dataChunksBytes[i].length;
+      offset += nodeHolder.getKeyLengths()[i];
+      if (!nodeHolder.getIsSortedKeyBlock()[i]) {
+        offset += nodeHolder.getKeyBlockIndexLength()[rowIdIndex];
+        rowIdIndex++;
+      }
+      if (dataWriterVo.getAggBlocks()[i]) {
+        offset += nodeHolder.getDataIndexMapLength()[rleIndex];
+        rleIndex++;
+      }
+    }
+    ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
+    rleIndex = 0;
+    rowIdIndex = 0;
+    for (int i = 0; i < nodeHolder.getIsSortedKeyBlock().length; i++) {
+      buffer.put(dataChunksBytes[i]);
+      buffer.put(nodeHolder.getKeyArray()[i]);
+      if (!nodeHolder.getIsSortedKeyBlock()[i]) {
+        buffer.putInt(nodeHolder.getCompressedIndex()[rowIdIndex].length);
+        buffer.put(nodeHolder.getCompressedIndex()[rowIdIndex]);
+        if (nodeHolder.getCompressedIndexMap()[rowIdIndex].length > 0) {
+          buffer.put(nodeHolder.getCompressedIndexMap()[rowIdIndex]);
+        }
+        rowIdIndex++;
+      }
+      if (dataWriterVo.getAggBlocks()[i]) {
+        buffer.put(nodeHolder.getCompressedDataIndex()[rleIndex]);
+        rleIndex++;
+      }
+    }
+    try {
+      buffer.flip();
+      channel.write(buffer);
+    } catch (IOException e) {
+      throw new CarbonDataWriterException(
+          "Problem while writing the dimension data in carbon data file", e);
+    }
+
+    int dataChunkIndex = nodeHolder.getKeyArray().length;
+    int totalLength = 0;
+    for (int i = 0; i < nodeHolder.getDataArray().length; i++) {
+      currentDataChunksOffset.add(offset);
+      currentDataChunksLength.add((short) 
dataChunksBytes[dataChunkIndex].length);
+      offset += dataChunksBytes[dataChunkIndex].length;
+      offset += nodeHolder.getDataArray()[i].length;
+      totalLength += dataChunksBytes[dataChunkIndex].length;
+      totalLength += nodeHolder.getDataArray()[i].length;
+      dataChunkIndex++;
+    }
+    buffer = ByteBuffer.allocate(totalLength);
+    dataChunkIndex = nodeHolder.getKeyArray().length;
+    for (int i = 0; i < nodeHolder.getDataArray().length; i++) {
+      buffer.put(dataChunksBytes[dataChunkIndex++]);
+      buffer.put(nodeHolder.getDataArray()[i]);
+    }
+    try {
+      buffer.flip();
+      channel.write(buffer);
+    } catch (IOException e) {
+      throw new CarbonDataWriterException(
+          "Problem while writing the measure data in carbon data file", e);
+    }
+  }
+
+  /**
+   * This method will be used to get the blocklet metadata
+   *
+   * @return BlockletInfo - blocklet metadata
+   */
+  protected BlockletInfoColumnar getBlockletInfo(NodeHolder nodeHolder, long 
offset) {
+    // create the info object for leaf entry
+    BlockletInfoColumnar info = new BlockletInfoColumnar();
+    //add aggBlocks array
+    info.setAggKeyBlock(nodeHolder.getAggBlocks());
+    // add total entry count
+    info.setNumberOfKeys(nodeHolder.getEntryCount());
+
+    // add the key array length
+    info.setKeyLengths(nodeHolder.getKeyLengths());
+    // adding null measure index bit set
+    info.setMeasureNullValueIndex(nodeHolder.getMeasureNullValueIndex());
+    //add column min max length
+    info.setColumnMaxData(nodeHolder.getColumnMaxData());
+    info.setColumnMinData(nodeHolder.getColumnMinData());
+
+    // add measure length
+    info.setMeasureLength(nodeHolder.getMeasureLenght());
+
+    info.setIsSortedKeyColumn(nodeHolder.getIsSortedKeyBlock());
+    info.setKeyBlockIndexLength(nodeHolder.getKeyBlockIndexLength());
+    info.setDataIndexMapLength(nodeHolder.getDataIndexMapLength());
+    // set startkey
+    info.setStartKey(nodeHolder.getStartKey());
+    // set end key
+    info.setEndKey(nodeHolder.getEndKey());
+    info.setCompressionModel(nodeHolder.getCompressionModel());
+    // return leaf metadata
+
+    //colGroup Blocks
+    info.setColGrpBlocks(nodeHolder.getColGrpBlocks());
+
+    return info;
+  }
+
+  /**
+   * This method will write metadata at the end of file file format in thrift 
format
+   */
+  protected void writeBlockletInfoToFile(List<BlockletInfoColumnar> infoList, 
FileChannel channel,
+      String filePath) throws CarbonDataWriterException {
+    try {
+      // get the current file position
+      long currentPosition = channel.size();
+      CarbonFooterWriter writer = new CarbonFooterWriter(filePath);
+      // get thrift file footer instance
+      FileFooter convertFileMeta = CarbonMetadataUtil
+          .convertFilterFooter2(infoList, localCardinality, 
thriftColumnSchemaList,
+              dataChunksOffsets, dataChunksLength);
+      // fill the carbon index details
+      fillBlockIndexInfoDetails(infoList, convertFileMeta.getNum_rows(), 
filePath, currentPosition);
+      // write the footer
+      writer.writeFooter(convertFileMeta, currentPosition);
+    } catch (IOException e) {
+      throw new CarbonDataWriterException("Problem while writing the carbon 
file: ", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
----------------------------------------------------------------------
diff --git 
a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
 
b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
index 84192b8..328e33b 100644
--- 
a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
+++ 
b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.apache.carbondata.core.carbon.datastore.BlockIndexStore;
 import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex;
 import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
@@ -52,7 +53,7 @@ public class BlockIndexStoreTest extends TestCase {
   @BeforeClass public void setUp() {
        property = 
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION);
        
-       
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
 "1");  
+       
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
 "V1");
     StoreCreator.createCarbonStore();
     indexStore = BlockIndexStore.getInstance();
   }
@@ -61,7 +62,7 @@ public class BlockIndexStoreTest extends TestCase {
            if(null!=property) {
                
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
 property);
            }else {
-               
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
 CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION+"");
+               
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
 CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION);
            }
          }
 
@@ -71,7 +72,7 @@ public class BlockIndexStoreTest extends TestCase {
     File file = getPartFile();
     TableBlockInfo info =
         new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { 
"loclhost" },
-            file.length(),(short)1);
+            file.length(), ColumnarFormatVersion.V1);
     CarbonTableIdentifier carbonTableIdentifier =
             new 
CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1");
     AbsoluteTableIdentifier absoluteTableIdentifier =
@@ -93,20 +94,20 @@ public class BlockIndexStoreTest extends TestCase {
     File file = getPartFile();
     TableBlockInfo info =
         new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { 
"loclhost" },
-            file.length(), (short)1);
+            file.length(), ColumnarFormatVersion.V1);
     TableBlockInfo info1 =
         new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { 
"loclhost" },
-            file.length(), (short)1);
+            file.length(), ColumnarFormatVersion.V1);
 
     TableBlockInfo info2 =
         new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { 
"loclhost" },
-            file.length(), (short)1);
+            file.length(), ColumnarFormatVersion.V1);
     TableBlockInfo info3 =
         new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { 
"loclhost" },
-            file.length(), (short)1);
+            file.length(), ColumnarFormatVersion.V1);
     TableBlockInfo info4 =
         new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { 
"loclhost" },
-            file.length(), (short)1);
+            file.length(), ColumnarFormatVersion.V1);
 
     CarbonTableIdentifier carbonTableIdentifier =
             new 
CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1");
@@ -148,31 +149,31 @@ public class BlockIndexStoreTest extends TestCase {
     File file = getPartFile();
     TableBlockInfo info =
         new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { 
"loclhost" },
-            file.length(), (short)1);
+            file.length(), ColumnarFormatVersion.V1);
     TableBlockInfo info1 =
         new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { 
"loclhost" },
-            file.length(), (short)1);
+            file.length(), ColumnarFormatVersion.V1);
 
     TableBlockInfo info2 =
         new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { 
"loclhost" },
-            file.length(), (short)1);
+            file.length(), ColumnarFormatVersion.V1);
     TableBlockInfo info3 =
         new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { 
"loclhost" },
-            file.length(), (short)1);
+            file.length(), ColumnarFormatVersion.V1);
     TableBlockInfo info4 =
         new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { 
"loclhost" },
-            file.length(), (short)1);
+            file.length(), ColumnarFormatVersion.V1);
 
     TableBlockInfo info5 =
         new TableBlockInfo(file.getAbsolutePath(), 0, "2", new String[] { 
"loclhost" },
-            file.length(),(short)1);
+            file.length(),ColumnarFormatVersion.V1);
     TableBlockInfo info6 =
         new TableBlockInfo(file.getAbsolutePath(), 0, "2", new String[] { 
"loclhost" },
-            file.length(), (short)1);
+            file.length(), ColumnarFormatVersion.V1);
 
     TableBlockInfo info7 =
         new TableBlockInfo(file.getAbsolutePath(), 0, "3", new String[] { 
"loclhost" },
-            file.length(), (short)1);
+            file.length(), ColumnarFormatVersion.V1);
 
     CarbonTableIdentifier carbonTableIdentifier =
             new 
CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1");

Reply via email to