Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 9ad98f432 -> 7213ac057


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 16bd771..279bb63 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -26,6 +26,7 @@ import java.io.FileFilter;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -39,7 +40,6 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.carbon.metadata.CarbonMetadata;
 import 
org.apache.carbondata.core.carbon.metadata.blocklet.index.BlockletBTreeIndex;
 import org.apache.carbondata.core.carbon.metadata.blocklet.index.BlockletIndex;
@@ -52,22 +52,21 @@ import 
org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSche
 import org.apache.carbondata.core.carbon.path.CarbonStorePath;
 import org.apache.carbondata.core.carbon.path.CarbonTablePath;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.columnar.IndexStorage;
+import 
org.apache.carbondata.core.datastorage.store.compression.SnappyCompression.SnappyByteCompression;
 import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
 import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonMergerUtil;
 import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.writer.CarbonFooterWriter;
 import org.apache.carbondata.core.writer.CarbonIndexFileWriter;
 import org.apache.carbondata.format.BlockIndex;
-import org.apache.carbondata.format.FileFooter;
 import org.apache.carbondata.format.IndexHeader;
 import org.apache.carbondata.processing.mdkeygen.file.FileData;
-import org.apache.carbondata.processing.mdkeygen.file.IFileManagerComposite;
-import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
 import 
org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
 
 import org.apache.commons.lang3.ArrayUtils;
@@ -87,10 +86,6 @@ public abstract class AbstractFactDataWriter<T> implements 
CarbonFactDataWriter<
    */
   private static final int HDFS_CHECKSUM_LENGTH = 512;
   /**
-   * measure count
-   */
-  protected int measureCount;
-  /**
    * file channel
    */
   protected FileChannel fileChannel;
@@ -98,16 +93,8 @@ public abstract class AbstractFactDataWriter<T> implements 
CarbonFactDataWriter<
    * this will be used for holding blocklet metadata
    */
   protected List<BlockletInfoColumnar> blockletInfoList;
-  /**
-   * keyBlockSize
-   */
-  protected int[] keyBlockSize;
   protected boolean[] isNoDictionary;
   /**
-   * mdkeySize
-   */
-  protected int mdkeySize;
-  /**
    * file name
    */
   protected String fileName;
@@ -115,15 +102,14 @@ public abstract class AbstractFactDataWriter<T> 
implements CarbonFactDataWriter<
    * Local cardinality for the segment
    */
   protected int[] localCardinality;
-  protected String databaseName;
   /**
    * thrift column schema
    */
   protected List<org.apache.carbondata.format.ColumnSchema> 
thriftColumnSchemaList;
-  /**
-   * tabel name
-   */
-  private String tableName;
+  protected NumberCompressor numberCompressor;
+  protected CarbonDataWriterVo dataWriterVo;
+  protected List<List<Long>> dataChunksOffsets;
+  protected List<List<Short>> dataChunksLength;
   /**
    * data file size;
    */
@@ -133,32 +119,15 @@ public abstract class AbstractFactDataWriter<T> 
implements CarbonFactDataWriter<
    */
   private int fileCount;
   /**
-   * File manager
-   */
-  private IFileManagerComposite fileManager;
-  /**
-   * Store Location
-   */
-  private String storeLocation;
-  /**
    * executorService
    */
   private ExecutorService executorService;
-
   /**
    * executorService
    */
   private List<Future<Void>> executorServiceSubmitList;
-  /**
-   * data file attributes which will used for file construction
-   */
-  private CarbonDataFileAttributes carbonDataFileAttributes;
   private CarbonTablePath carbonTablePath;
   /**
-   * data directory location in carbon store path
-   */
-  private String carbonDataDirectoryPath;
-  /**
    * data block size for one carbon data file
    */
   private long dataBlockSize;
@@ -171,68 +140,55 @@ public abstract class AbstractFactDataWriter<T> 
implements CarbonFactDataWriter<
    */
   private int spaceReservedForBlockMetaSize;
   private FileOutputStream fileOutputStream;
-
-  private SegmentProperties segmentProperties;
-
   private List<BlockIndexInfo> blockIndexInfoList;
 
-  public AbstractFactDataWriter(String storeLocation, int measureCount, int 
mdKeyLength,
-      String databaseName, String tableName, IFileManagerComposite 
fileManager, int[] keyBlockSize,
-      CarbonDataFileAttributes carbonDataFileAttributes, List<ColumnSchema> 
columnSchema,
-      String carbonDataDirectoryPath, int[] colCardinality, SegmentProperties 
segmentProperties,
-      int blocksize) {
-
-    // measure count
-    this.measureCount = measureCount;
-    // table name
-    this.tableName = tableName;
-    this.databaseName = databaseName;
-
-    this.databaseName = databaseName;
-
-    this.storeLocation = storeLocation;
-    this.segmentProperties = segmentProperties;
+  public AbstractFactDataWriter(CarbonDataWriterVo dataWriterVo) {
+    this.dataWriterVo = dataWriterVo;
     this.blockletInfoList =
         new 
ArrayList<BlockletInfoColumnar>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
     blockIndexInfoList = new ArrayList<>();
     // get max file size;
     CarbonProperties propInstance = CarbonProperties.getInstance();
     // if blocksize=2048, then 2048*1024*1024 will beyond the range of Int
-    this.fileSizeInBytes = (long) blocksize * 
CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR
-        * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR;
+    this.fileSizeInBytes =
+        (long) dataWriterVo.getTableBlocksize() * 
CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR
+            * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR;
     this.spaceReservedForBlockMetaSize = Integer.parseInt(propInstance
         .getProperty(CarbonCommonConstants.CARBON_BLOCK_META_RESERVED_SPACE,
             CarbonCommonConstants.CARBON_BLOCK_META_RESERVED_SPACE_DEFAULT));
     this.dataBlockSize = fileSizeInBytes - (fileSizeInBytes * 
spaceReservedForBlockMetaSize) / 100;
     LOGGER.info("Total file size: " + fileSizeInBytes + " and dataBlock Size: 
" + dataBlockSize);
-    this.fileManager = fileManager;
-    this.carbonDataDirectoryPath = carbonDataDirectoryPath;
-    this.keyBlockSize = keyBlockSize;
-    this.mdkeySize = mdKeyLength;
+
     this.executorService = Executors.newFixedThreadPool(1);
     executorServiceSubmitList = new 
ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     // in case of compaction we will pass the cardinality.
-    this.localCardinality = colCardinality;
-    this.carbonDataFileAttributes = carbonDataFileAttributes;
-    CarbonTable carbonTable = CarbonMetadata.getInstance()
-        .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + 
tableName);
-    carbonTablePath =
-        CarbonStorePath.getCarbonTablePath(storeLocation, 
carbonTable.getCarbonTableIdentifier());
+    this.localCardinality = dataWriterVo.getColCardinality();
+    CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(
+        dataWriterVo.getDatabaseName() + CarbonCommonConstants.UNDERSCORE + 
dataWriterVo
+            .getTableName());
+    carbonTablePath = 
CarbonStorePath.getCarbonTablePath(dataWriterVo.getStoreLocation(),
+        carbonTable.getCarbonTableIdentifier());
     //TODO: We should delete the levelmetadata file after reading here.
     // so only data loading flow will need to read from cardinality file.
     if (null == this.localCardinality) {
-      this.localCardinality =
-          CarbonMergerUtil.getCardinalityFromLevelMetadata(storeLocation, 
tableName);
+      this.localCardinality = CarbonMergerUtil
+          .getCardinalityFromLevelMetadata(dataWriterVo.getStoreLocation(),
+              dataWriterVo.getTableName());
       List<Integer> cardinalityList = new ArrayList<Integer>();
-      thriftColumnSchemaList =
-          getColumnSchemaListAndCardinality(cardinalityList, localCardinality, 
columnSchema);
+      thriftColumnSchemaList = 
getColumnSchemaListAndCardinality(cardinalityList, localCardinality,
+          dataWriterVo.getWrapperColumnSchemaList());
       localCardinality =
           ArrayUtils.toPrimitive(cardinalityList.toArray(new 
Integer[cardinalityList.size()]));
     } else { // for compaction case
       List<Integer> cardinalityList = new ArrayList<Integer>();
-      thriftColumnSchemaList =
-          getColumnSchemaListAndCardinality(cardinalityList, localCardinality, 
columnSchema);
+      thriftColumnSchemaList = 
getColumnSchemaListAndCardinality(cardinalityList, localCardinality,
+          dataWriterVo.getWrapperColumnSchemaList());
     }
+    this.numberCompressor = new 
NumberCompressor(Integer.parseInt(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.BLOCKLET_SIZE,
+            CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL)));
+    this.dataChunksOffsets = new ArrayList<>();
+    this.dataChunksLength = new ArrayList<>();
   }
 
   /**
@@ -258,8 +214,8 @@ public abstract class AbstractFactDataWriter<T> implements 
CarbonFactDataWriter<
     String readableFileSize = ByteUtil.convertByteToReadable(fileSize);
     String readableMaxSize = ByteUtil.convertByteToReadable(maxSize);
     LOGGER.info("The configured block size is " + readableBlockSize +
-            ", the actual carbon file size is " + readableFileSize +
-            ", choose the max value " + readableMaxSize + " as the block size 
on HDFS");
+        ", the actual carbon file size is " + readableFileSize +
+        ", choose the max value " + readableMaxSize + " as the block size on 
HDFS");
     return maxSize;
   }
 
@@ -291,6 +247,8 @@ public abstract class AbstractFactDataWriter<T> implements 
CarbonFactDataWriter<
       this.currentFileSize = 0;
       blockletInfoList =
           new 
ArrayList<BlockletInfoColumnar>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+      this.dataChunksOffsets = new ArrayList<>();
+      this.dataChunksLength = new ArrayList<>();
       CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
       // rename carbon data file from in progress status to actual
       renameCarbonDataFile();
@@ -312,12 +270,12 @@ public abstract class AbstractFactDataWriter<T> 
implements CarbonFactDataWriter<
     // increment the file sequence counter
     initFileCount();
     String carbonDataFileName = carbonTablePath
-        .getCarbonDataFileName(fileCount, carbonDataFileAttributes.getTaskId(),
-            carbonDataFileAttributes.getFactTimeStamp());
+        .getCarbonDataFileName(fileCount, 
dataWriterVo.getCarbonDataFileAttributes().getTaskId(),
+            dataWriterVo.getCarbonDataFileAttributes().getFactTimeStamp());
     String actualFileNameVal = carbonDataFileName + 
CarbonCommonConstants.FILE_INPROGRESS_STATUS;
-    FileData fileData = new FileData(actualFileNameVal, this.storeLocation);
-    fileManager.add(fileData);
-    this.fileName = storeLocation + File.separator + carbonDataFileName
+    FileData fileData = new FileData(actualFileNameVal, 
dataWriterVo.getStoreLocation());
+    dataWriterVo.getFileManager().add(fileData);
+    this.fileName = dataWriterVo.getStoreLocation() + File.separator + 
carbonDataFileName
         + CarbonCommonConstants.FILE_INPROGRESS_STATUS;
     this.fileCount++;
     try {
@@ -332,11 +290,10 @@ public abstract class AbstractFactDataWriter<T> 
implements CarbonFactDataWriter<
 
   private int initFileCount() {
     int fileInitialCount = 0;
-    File[] dataFiles = new File(storeLocation).listFiles(new FileFilter() {
-
+    File[] dataFiles = new File(dataWriterVo.getStoreLocation()).listFiles(new 
FileFilter() {
       @Override public boolean accept(File pathVal) {
-        if (!pathVal.isDirectory() && pathVal.getName().startsWith(tableName) 
&& pathVal.getName()
-            .contains(CarbonCommonConstants.FACT_FILE_EXT)) {
+        if (!pathVal.isDirectory() && 
pathVal.getName().startsWith(dataWriterVo.getTableName())
+            && 
pathVal.getName().contains(CarbonCommonConstants.FACT_FILE_EXT)) {
           return true;
         }
         return false;
@@ -359,20 +316,8 @@ public abstract class AbstractFactDataWriter<T> implements 
CarbonFactDataWriter<
   /**
    * This method will write metadata at the end of file file format in thrift 
format
    */
-  protected void writeBlockletInfoToFile(List<BlockletInfoColumnar> infoList, 
FileChannel channel,
-      String filePath) throws CarbonDataWriterException {
-    try {
-      long currentPosition = channel.size();
-      CarbonFooterWriter writer = new CarbonFooterWriter(filePath);
-      FileFooter convertFileMeta = CarbonMetadataUtil
-          .convertFileFooter(infoList, localCardinality.length, 
localCardinality,
-              thriftColumnSchemaList, segmentProperties);
-      fillBlockIndexInfoDetails(infoList, convertFileMeta.getNum_rows(), 
filePath, currentPosition);
-      writer.writeFooter(convertFileMeta, currentPosition);
-    } catch (IOException e) {
-      throw new CarbonDataWriterException("Problem while writing the carbon 
file: ", e);
-    }
-  }
+  protected abstract void writeBlockletInfoToFile(List<BlockletInfoColumnar> 
infoList,
+      FileChannel channel, String filePath) throws CarbonDataWriterException;
 
   /**
    * Below method will be used to fill the vlock info details
@@ -382,7 +327,7 @@ public abstract class AbstractFactDataWriter<T> implements 
CarbonFactDataWriter<
    * @param filePath        file path
    * @param currentPosition current offset
    */
-  private void fillBlockIndexInfoDetails(List<BlockletInfoColumnar> infoList, 
long numberOfRows,
+  protected void fillBlockIndexInfoDetails(List<BlockletInfoColumnar> 
infoList, long numberOfRows,
       String filePath, long currentPosition) {
 
     // as min-max will change for each blocklet and second blocklet min-max 
can be lesser than
@@ -448,56 +393,7 @@ public abstract class AbstractFactDataWriter<T> implements 
CarbonFactDataWriter<
    *
    * @return BlockletInfo - blocklet metadata
    */
-  protected BlockletInfoColumnar getBlockletInfo(NodeHolder nodeHolder, long 
offset) {
-    // create the info object for leaf entry
-    BlockletInfoColumnar infoObj = new BlockletInfoColumnar();
-    // add total entry count
-    infoObj.setNumberOfKeys(nodeHolder.getEntryCount());
-
-    // add the key array length
-    infoObj.setKeyLengths(nodeHolder.getKeyLengths());
-    //add column min max data
-    infoObj.setColumnMaxData(nodeHolder.getColumnMaxData());
-    infoObj.setColumnMinData(nodeHolder.getColumnMinData());
-    infoObj.setMeasureNullValueIndex(nodeHolder.getMeasureNullValueIndex());
-    long[] keyOffSets = new long[nodeHolder.getKeyLengths().length];
-
-    for (int i = 0; i < keyOffSets.length; i++) {
-      keyOffSets[i] = offset;
-      offset += nodeHolder.getKeyLengths()[i];
-    }
-    // add key offset
-    infoObj.setKeyOffSets(keyOffSets);
-
-    // add measure length
-    infoObj.setMeasureLength(nodeHolder.getMeasureLenght());
-
-    long[] msrOffset = new long[this.measureCount];
-
-    for (int i = 0; i < this.measureCount; i++) {
-      msrOffset[i] = offset;
-      // now increment the offset by adding measure length to get the next
-      // measure offset;
-      offset += nodeHolder.getMeasureLenght()[i];
-    }
-    // add measure offset
-    infoObj.setMeasureOffset(msrOffset);
-    infoObj.setIsSortedKeyColumn(nodeHolder.getIsSortedKeyBlock());
-    infoObj.setKeyBlockIndexLength(nodeHolder.getKeyBlockIndexLength());
-    long[] keyBlockIndexOffsets = new 
long[nodeHolder.getKeyBlockIndexLength().length];
-    for (int i = 0; i < keyBlockIndexOffsets.length; i++) {
-      keyBlockIndexOffsets[i] = offset;
-      offset += nodeHolder.getKeyBlockIndexLength()[i];
-    }
-    infoObj.setKeyBlockIndexOffSets(keyBlockIndexOffsets);
-    // set startkey
-    infoObj.setStartKey(nodeHolder.getStartKey());
-    // set end key
-    infoObj.setEndKey(nodeHolder.getEndKey());
-    infoObj.setCompressionModel(nodeHolder.getCompressionModel());
-    // return leaf metadata
-    return infoObj;
-  }
+  protected abstract BlockletInfoColumnar getBlockletInfo(NodeHolder 
nodeHolder, long offset);
 
   /**
    * Method will be used to close the open file channel
@@ -528,9 +424,9 @@ public abstract class AbstractFactDataWriter<T> implements 
CarbonFactDataWriter<
         CarbonMetadataUtil.getIndexHeader(localCardinality, 
thriftColumnSchemaList);
     // get the block index info thrift
     List<BlockIndex> blockIndexThrift = 
CarbonMetadataUtil.getBlockIndexInfo(blockIndexInfoList);
-    String fileName = storeLocation + File.separator + carbonTablePath
-        .getCarbonIndexFileName(carbonDataFileAttributes.getTaskId(),
-            carbonDataFileAttributes.getFactTimeStamp());
+    String fileName = dataWriterVo.getStoreLocation() + File.separator + 
carbonTablePath
+        
.getCarbonIndexFileName(dataWriterVo.getCarbonDataFileAttributes().getTaskId(),
+            dataWriterVo.getCarbonDataFileAttributes().getFactTimeStamp());
     CarbonIndexFileWriter writer = new CarbonIndexFileWriter();
     // open file
     writer.openThriftWriter(fileName);
@@ -591,11 +487,11 @@ public abstract class AbstractFactDataWriter<T> 
implements CarbonFactDataWriter<
   private void copyCarbonDataFileToCarbonStorePath(String localFileName)
       throws CarbonDataWriterException {
     long copyStartTime = System.currentTimeMillis();
-    LOGGER.info("Copying " + localFileName + " --> " + 
carbonDataDirectoryPath);
+    LOGGER.info("Copying " + localFileName + " --> " + 
dataWriterVo.getCarbonDataDirectoryPath());
     try {
       CarbonFile localCarbonFile =
           FileFactory.getCarbonFile(localFileName, 
FileFactory.getFileType(localFileName));
-      String carbonFilePath = carbonDataDirectoryPath + localFileName
+      String carbonFilePath = dataWriterVo.getCarbonDataDirectoryPath() + 
localFileName
           .substring(localFileName.lastIndexOf(File.separator));
       copyLocalFileToCarbonStore(carbonFilePath, localFileName,
           CarbonCommonConstants.BYTEBUFFER_SIZE,
@@ -654,18 +550,7 @@ public abstract class AbstractFactDataWriter<T> implements 
CarbonFactDataWriter<
    * @throws CarbonDataWriterException
    * @throws CarbonDataWriterException throws new CarbonDataWriterException if 
any problem
    */
-  protected void writeDataToFile(NodeHolder nodeHolder) throws 
CarbonDataWriterException {
-    // write data to file and get its offset
-    long offset = writeDataToFile(nodeHolder, fileChannel);
-    // get the blocklet info for currently added blocklet
-    BlockletInfoColumnar blockletInfo = getBlockletInfo(nodeHolder, offset);
-    // add blocklet info to list
-    blockletInfoList.add(blockletInfo);
-    // calculate the current size of the file
-  }
-
-  protected abstract long writeDataToFile(NodeHolder nodeHolder, FileChannel 
channel)
-      throws CarbonDataWriterException;
+  public abstract void writeBlockletData(NodeHolder nodeHolder) throws 
CarbonDataWriterException;
 
   @Override public int getLeafMetadataSize() {
     return blockletInfoList.size();
@@ -675,6 +560,99 @@ public abstract class AbstractFactDataWriter<T> implements 
CarbonFactDataWriter<
     return this.fileName;
   }
 
+  protected byte[][] fillAndCompressedKeyBlockData(IndexStorage<int[]>[] 
keyStorageArray,
+      int entryCount) {
+    byte[][] keyBlockData = new byte[keyStorageArray.length][];
+    int destPos = 0;
+    int keyBlockSizePosition = -1;
+    for (int i = 0; i < keyStorageArray.length; i++) {
+      destPos = 0;
+      //handling for high card dims
+      if (!dataWriterVo.getIsComplexType()[i] && 
!dataWriterVo.getIsDictionaryColumn()[i]) {
+        int totalLength = 0;
+        // calc size of the total bytes in all the colmns.
+        for (int k = 0; k < keyStorageArray[i].getKeyBlock().length; k++) {
+          byte[] colValue = keyStorageArray[i].getKeyBlock()[k];
+          totalLength += colValue.length;
+        }
+        keyBlockData[i] = new byte[totalLength];
+
+        for (int j = 0; j < keyStorageArray[i].getKeyBlock().length; j++) {
+          int length = keyStorageArray[i].getKeyBlock()[j].length;
+          System
+              .arraycopy(keyStorageArray[i].getKeyBlock()[j], 0, 
keyBlockData[i], destPos, length);
+          destPos += length;
+        }
+      } else {
+        keyBlockSizePosition++;
+        if (dataWriterVo.getAggBlocks()[i]) {
+          keyBlockData[i] = new byte[keyStorageArray[i].getTotalSize()];
+          for (int j = 0; j < keyStorageArray[i].getKeyBlock().length; j++) {
+            System.arraycopy(keyStorageArray[i].getKeyBlock()[j], 0, 
keyBlockData[i], destPos,
+                keyStorageArray[i].getKeyBlock()[j].length);
+            destPos += keyStorageArray[i].getKeyBlock()[j].length;
+          }
+        } else {
+          if (dataWriterVo.getIsComplexType()[i]) {
+            keyBlockData[i] = new byte[keyStorageArray[i].getKeyBlock().length 
* dataWriterVo
+                .getKeyBlockSize()[keyBlockSizePosition]];
+          } else {
+            keyBlockData[i] =
+                new byte[entryCount * 
dataWriterVo.getKeyBlockSize()[keyBlockSizePosition]];
+          }
+          for (int j = 0; j < keyStorageArray[i].getKeyBlock().length; j++) {
+            System.arraycopy(keyStorageArray[i].getKeyBlock()[j], 0, 
keyBlockData[i], destPos,
+                dataWriterVo.getKeyBlockSize()[keyBlockSizePosition]);
+            destPos += dataWriterVo.getKeyBlockSize()[keyBlockSizePosition];
+          }
+        }
+      }
+      keyBlockData[i] = 
SnappyByteCompression.INSTANCE.compress(keyBlockData[i]);
+    }
+    return keyBlockData;
+  }
+
+  /**
+   * Below method will be used to update the min or max value
+   * by removing the length from it
+   *
+   * @return min max value without length
+   */
+  protected byte[] updateMinMaxForNoDictionary(byte[] valueWithLength) {
+    ByteBuffer buffer = ByteBuffer.wrap(valueWithLength);
+    byte[] actualValue = new byte[buffer.getShort()];
+    buffer.get(actualValue);
+    return actualValue;
+  }
+
+  /**
+   * Below method will be used to update the no dictionary start and end key
+   *
+   * @param key key to be updated
+   * @return return no dictionary key
+   */
+  protected byte[] updateNoDictionaryStartAndEndKey(byte[] key) {
+    if (key.length == 0) {
+      return key;
+    }
+    // add key to byte buffer remove the length part of the data
+    ByteBuffer buffer = ByteBuffer.wrap(key, 2, key.length - 2);
+    // create a output buffer without length
+    ByteBuffer output = ByteBuffer.allocate(key.length - 2);
+    short numberOfByteToStorLength = 2;
+    // as length part is removed, so each no dictionary value index
+    // needs to be reshuffled by 2 bytes
+    for (int i = 0; i < dataWriterVo.getNoDictionaryCount(); i++) {
+      output.putShort((short) (buffer.getShort() - numberOfByteToStorLength));
+    }
+    // copy the data part
+    while (buffer.hasRemaining()) {
+      output.put(buffer.get());
+    }
+    output.rewind();
+    return output.array();
+  }
+
   /**
    * This method will copy the carbon data file from local store location to
    * carbon store location

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
new file mode 100644
index 0000000..6e0287d
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.store.writer;
+
+import java.util.List;
+
+import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
+import 
org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.processing.mdkeygen.file.IFileManagerComposite;
+import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
+
+/**
+ * Value object for writing the data
+ */
+public class CarbonDataWriterVo {
+
+  private String storeLocation;
+
+  private int measureCount;
+
+  private int mdKeyLength;
+
+  private String tableName;
+
+  private IFileManagerComposite fileManager;
+
+  private int[] keyBlockSize;
+
+  private boolean[] aggBlocks;
+
+  private boolean[] isComplexType;
+
+  private int NoDictionaryCount;
+
+  private CarbonDataFileAttributes carbonDataFileAttributes;
+
+  private String databaseName;
+
+  private List<ColumnSchema> wrapperColumnSchemaList;
+
+  private int numberOfNoDictionaryColumn;
+
+  private boolean[] isDictionaryColumn;
+
+  private String carbonDataDirectoryPath;
+
+  private int[] colCardinality;
+
+  private SegmentProperties segmentProperties;
+
+  private int tableBlocksize;
+
+  /**
+   * @return the storeLocation
+   */
+  public String getStoreLocation() {
+    return storeLocation;
+  }
+
+  /**
+   * @param storeLocation the storeLocation to set
+   */
+  public void setStoreLocation(String storeLocation) {
+    this.storeLocation = storeLocation;
+  }
+
+  /**
+   * @return the measureCount
+   */
+  public int getMeasureCount() {
+    return measureCount;
+  }
+
+  /**
+   * @param measureCount the measureCount to set
+   */
+  public void setMeasureCount(int measureCount) {
+    this.measureCount = measureCount;
+  }
+
+  /**
+   * @return the mdKeyLength
+   */
+  public int getMdKeyLength() {
+    return mdKeyLength;
+  }
+
+  /**
+   * @param mdKeyLength the mdKeyLength to set
+   */
+  public void setMdKeyLength(int mdKeyLength) {
+    this.mdKeyLength = mdKeyLength;
+  }
+
+  /**
+   * @return the tableName
+   */
+  public String getTableName() {
+    return tableName;
+  }
+
+  /**
+   * @param tableName the tableName to set
+   */
+  public void setTableName(String tableName) {
+    this.tableName = tableName;
+  }
+
+  /**
+   * @return the fileManager
+   */
+  public IFileManagerComposite getFileManager() {
+    return fileManager;
+  }
+
+  /**
+   * @param fileManager the fileManager to set
+   */
+  public void setFileManager(IFileManagerComposite fileManager) {
+    this.fileManager = fileManager;
+  }
+
+  /**
+   * @return the keyBlockSize
+   */
+  public int[] getKeyBlockSize() {
+    return keyBlockSize;
+  }
+
+  /**
+   * @param keyBlockSize the keyBlockSize to set
+   */
+  public void setKeyBlockSize(int[] keyBlockSize) {
+    this.keyBlockSize = keyBlockSize;
+  }
+
+  /**
+   * @return the aggBlocks
+   */
+  public boolean[] getAggBlocks() {
+    return aggBlocks;
+  }
+
+  /**
+   * @param aggBlocks the aggBlocks to set
+   */
+  public void setAggBlocks(boolean[] aggBlocks) {
+    this.aggBlocks = aggBlocks;
+  }
+
+  /**
+   * @return the isComplexType
+   */
+  public boolean[] getIsComplexType() {
+    return isComplexType;
+  }
+
+  /**
+   * @param isComplexType the isComplexType to set
+   */
+  public void setIsComplexType(boolean[] isComplexType) {
+    this.isComplexType = isComplexType;
+  }
+
+  /**
+   * @return the noDictionaryCount
+   */
+  public int getNoDictionaryCount() {
+    return NoDictionaryCount;
+  }
+
+  /**
+   * @param noDictionaryCount the noDictionaryCount to set
+   */
+  public void setNoDictionaryCount(int noDictionaryCount) {
+    NoDictionaryCount = noDictionaryCount;
+  }
+
+  /**
+   * @return the carbonDataFileAttributes
+   */
+  public CarbonDataFileAttributes getCarbonDataFileAttributes() {
+    return carbonDataFileAttributes;
+  }
+
+  /**
+   * @param carbonDataFileAttributes the carbonDataFileAttributes to set
+   */
+  public void setCarbonDataFileAttributes(CarbonDataFileAttributes 
carbonDataFileAttributes) {
+    this.carbonDataFileAttributes = carbonDataFileAttributes;
+  }
+
+  /**
+   * @return the databaseName
+   */
+  public String getDatabaseName() {
+    return databaseName;
+  }
+
+  /**
+   * @param databaseName the databaseName to set
+   */
+  public void setDatabaseName(String databaseName) {
+    this.databaseName = databaseName;
+  }
+
+  /**
+   * @return the wrapperColumnSchemaList
+   */
+  public List<ColumnSchema> getWrapperColumnSchemaList() {
+    return wrapperColumnSchemaList;
+  }
+
+  /**
+   * @param wrapperColumnSchemaList the wrapperColumnSchemaList to set
+   */
+  public void setWrapperColumnSchemaList(List<ColumnSchema> 
wrapperColumnSchemaList) {
+    this.wrapperColumnSchemaList = wrapperColumnSchemaList;
+  }
+
+  /**
+   * @return the numberOfNoDictionaryColumn
+   */
+  public int getNumberOfNoDictionaryColumn() {
+    return numberOfNoDictionaryColumn;
+  }
+
+  /**
+   * @param numberOfNoDictionaryColumn the numberOfNoDictionaryColumn to set
+   */
+  public void setNumberOfNoDictionaryColumn(int numberOfNoDictionaryColumn) {
+    this.numberOfNoDictionaryColumn = numberOfNoDictionaryColumn;
+  }
+
+  /**
+   * @return the isDictionaryColumn
+   */
+  public boolean[] getIsDictionaryColumn() {
+    return isDictionaryColumn;
+  }
+
+  /**
+   * @param isDictionaryColumn the isDictionaryColumn to set
+   */
+  public void setIsDictionaryColumn(boolean[] isDictionaryColumn) {
+    this.isDictionaryColumn = isDictionaryColumn;
+  }
+
+  /**
+   * @return the carbonDataDirectoryPath
+   */
+  public String getCarbonDataDirectoryPath() {
+    return carbonDataDirectoryPath;
+  }
+
+  /**
+   * @param carbonDataDirectoryPath the carbonDataDirectoryPath to set
+   */
+  public void setCarbonDataDirectoryPath(String carbonDataDirectoryPath) {
+    this.carbonDataDirectoryPath = carbonDataDirectoryPath;
+  }
+
+  /**
+   * @return the colCardinality
+   */
+  public int[] getColCardinality() {
+    return colCardinality;
+  }
+
+  /**
+   * @param colCardinality the colCardinality to set
+   */
+  public void setColCardinality(int[] colCardinality) {
+    this.colCardinality = colCardinality;
+  }
+
+  /**
+   * @return the segmentProperties
+   */
+  public SegmentProperties getSegmentProperties() {
+    return segmentProperties;
+  }
+
+  /**
+   * @param segmentProperties the segmentProperties to set
+   */
+  public void setSegmentProperties(SegmentProperties segmentProperties) {
+    this.segmentProperties = segmentProperties;
+  }
+
+  /**
+   * @return the tableBlocksize
+   */
+  public int getTableBlocksize() {
+    return tableBlocksize;
+  }
+
+  /**
+   * @param tableBlocksize the tableBlocksize to set
+   */
+  public void setTableBlocksize(int tableBlocksize) {
+    this.tableBlocksize = tableBlocksize;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImpl2.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImpl2.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImpl2.java
new file mode 100644
index 0000000..d399280
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImpl2.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.store.writer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.writer.CarbonFooterWriter;
+import org.apache.carbondata.format.DataChunk2;
+import org.apache.carbondata.format.FileFooter;
+import 
org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
+
+/**
+ * Below method will be used to write the data in version 2 format
+ */
+public class CarbonFactDataWriterImpl2 extends 
CarbonFactDataWriterImplForIntIndexAndAggBlock {
+
+  /**
+   * logger
+   */
+  private static final LogService LOGGER =
+      
LogServiceFactory.getLogService(CarbonFactDataWriterImpl2.class.getName());
+
+  /**
+   * Constructor create instance of this class
+   *
+   * @param dataWriterVo
+   */
+  public CarbonFactDataWriterImpl2(CarbonDataWriterVo dataWriterVo) {
+    super(dataWriterVo);
+  }
+
+  /**
+   * Below method will be used to write the data to carbon data file
+   *
+   * @param holder
+   * @throws CarbonDataWriterException any problem in writing operation
+   */
+  @Override public void writeBlockletData(NodeHolder holder) throws 
CarbonDataWriterException {
+    // size to calculate the size of the blocklet
+    int size = 0;
+    // get the blocklet info object
+    BlockletInfoColumnar blockletInfo = getBlockletInfo(holder, 0);
+
+    List<DataChunk2> datachunks = null;
+    try {
+      // get all the data chunks
+      datachunks = CarbonMetadataUtil
+          .getDatachunk2(blockletInfo, thriftColumnSchemaList, 
dataWriterVo.getSegmentProperties());
+    } catch (IOException e) {
+      throw new CarbonDataWriterException("Problem while getting the data 
chunks", e);
+    }
+    // data chunk byte array
+    byte[][] dataChunkByteArray = new byte[datachunks.size()][];
+    for (int i = 0; i < dataChunkByteArray.length; i++) {
+      dataChunkByteArray[i] = CarbonUtil.getByteArray(datachunks.get(i));
+      // add the data chunk size
+      size += dataChunkByteArray[i].length;
+    }
+    // add row id index length
+    for (int i = 0; i < holder.getKeyBlockIndexLength().length; i++) {
+      size += holder.getKeyBlockIndexLength()[i];
+    }
+    // add rle index length
+    for (int i = 0; i < holder.getDataIndexMapLength().length; i++) {
+      size += holder.getDataIndexMapLength()[i];
+    }
+    // add dimension column data page and measure column data page size
+    long blockletDataSize =
+        holder.getTotalDimensionArrayLength() + 
holder.getTotalMeasureArrayLength() + size;
+    // if size of the file already reached threshold size then create a new 
file and get the file
+    // channel object
+    updateBlockletFileChannel(blockletDataSize);
+    // writer the version header in the file if current file size is zero
+    // this is done so carbondata file can be read separately
+    try {
+      if (fileChannel.size() == 0) {
+        short version = Short.parseShort(CarbonProperties.getInstance()
+            .getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION));
+        byte[] header = (CarbonCommonConstants.CARBON_DATA_VERSION_HEADER + 
version).getBytes();
+        ByteBuffer buffer = ByteBuffer.allocate(header.length);
+        buffer.put(header);
+        buffer.rewind();
+        fileChannel.write(buffer);
+      }
+    } catch (IOException e) {
+      throw new CarbonDataWriterException("Problem while getting the file 
channel size", e);
+    }
+    // write data to file and get its offset
+    writeDataToFile(holder, dataChunkByteArray, fileChannel);
+    // add blocklet info to list
+    blockletInfoList.add(blockletInfo);
+    LOGGER.info("A new blocklet is added, its data size is: " + 
blockletDataSize + " Byte");
+  }
+
+  /**
+   * Below method will be used to write the data to file
+   * Data Format
+   * <DColumn1DataChunk><DColumnDataPage><DColumnRle>
+   * <DColumn2DataChunk><DColumn2DataPage><DColumn2RowIds><DColumn2Rle>
+   * <DColumn3DataChunk><DColumn3DataPage><column3RowIds>
+   * <MColumn1DataChunk><MColumn1DataPage>
+   * <MColumn2DataChunk><MColumn2DataPage>
+   * <MColumn2DataChunk><MColumn2DataPage>
+   *
+   * @param nodeHolder
+   * @param dataChunksBytes
+   * @param channel
+   * @throws CarbonDataWriterException
+   */
+  private void writeDataToFile(NodeHolder nodeHolder, byte[][] 
dataChunksBytes, FileChannel channel)
+      throws CarbonDataWriterException {
+    long offset = 0;
+    try {
+      offset = channel.size();
+    } catch (IOException e) {
+      throw new CarbonDataWriterException("Problem while getting the file 
channel size");
+    }
+    List<Long> currentDataChunksOffset = new ArrayList<>();
+    List<Short> currentDataChunksLength = new ArrayList<>();
+    dataChunksLength.add(currentDataChunksLength);
+    dataChunksOffsets.add(currentDataChunksOffset);
+    int bufferSize = 0;
+    int rowIdIndex = 0;
+    int rleIndex = 0;
+    for (int i = 0; i < nodeHolder.getIsSortedKeyBlock().length; i++) {
+      currentDataChunksOffset.add(offset);
+      currentDataChunksLength.add((short) dataChunksBytes[i].length);
+      bufferSize += dataChunksBytes[i].length + nodeHolder.getKeyLengths()[i] 
+ (!nodeHolder
+          .getIsSortedKeyBlock()[i] ? 
nodeHolder.getKeyBlockIndexLength()[rowIdIndex] : 0) + (
+          dataWriterVo.getAggBlocks()[i] ?
+              nodeHolder.getCompressedDataIndex()[rleIndex].length :
+              0);
+      offset += dataChunksBytes[i].length;
+      offset += nodeHolder.getKeyLengths()[i];
+      if (!nodeHolder.getIsSortedKeyBlock()[i]) {
+        offset += nodeHolder.getKeyBlockIndexLength()[rowIdIndex];
+        rowIdIndex++;
+      }
+      if (dataWriterVo.getAggBlocks()[i]) {
+        offset += nodeHolder.getDataIndexMapLength()[rleIndex];
+        rleIndex++;
+      }
+    }
+    ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
+    rleIndex = 0;
+    rowIdIndex = 0;
+    for (int i = 0; i < nodeHolder.getIsSortedKeyBlock().length; i++) {
+      buffer.put(dataChunksBytes[i]);
+      buffer.put(nodeHolder.getKeyArray()[i]);
+      if (!nodeHolder.getIsSortedKeyBlock()[i]) {
+        buffer.putInt(nodeHolder.getCompressedIndex()[rowIdIndex].length);
+        buffer.put(nodeHolder.getCompressedIndex()[rowIdIndex]);
+        if (nodeHolder.getCompressedIndexMap()[rowIdIndex].length > 0) {
+          buffer.put(nodeHolder.getCompressedIndexMap()[rowIdIndex]);
+        }
+        rowIdIndex++;
+      }
+      if (dataWriterVo.getAggBlocks()[i]) {
+        buffer.put(nodeHolder.getCompressedDataIndex()[rleIndex]);
+        rleIndex++;
+      }
+    }
+    try {
+      buffer.flip();
+      channel.write(buffer);
+    } catch (IOException e) {
+      throw new CarbonDataWriterException(
+          "Problem while writing the dimension data in carbon data file", e);
+    }
+
+    int dataChunkIndex = nodeHolder.getKeyArray().length;
+    int totalLength = 0;
+    for (int i = 0; i < nodeHolder.getDataArray().length; i++) {
+      currentDataChunksOffset.add(offset);
+      currentDataChunksLength.add((short) 
dataChunksBytes[dataChunkIndex].length);
+      offset += dataChunksBytes[dataChunkIndex].length;
+      offset += nodeHolder.getDataArray()[i].length;
+      totalLength += dataChunksBytes[dataChunkIndex].length;
+      totalLength += nodeHolder.getDataArray()[i].length;
+      dataChunkIndex++;
+    }
+    buffer = ByteBuffer.allocate(totalLength);
+    dataChunkIndex = nodeHolder.getKeyArray().length;
+    for (int i = 0; i < nodeHolder.getDataArray().length; i++) {
+      buffer.put(dataChunksBytes[dataChunkIndex++]);
+      buffer.put(nodeHolder.getDataArray()[i]);
+    }
+    try {
+      buffer.flip();
+      channel.write(buffer);
+    } catch (IOException e) {
+      throw new CarbonDataWriterException(
+          "Problem while writing the measure data in carbon data file", e);
+    }
+  }
+
+  /**
+   * This method will be used to get the blocklet metadata
+   *
+   * @return BlockletInfo - blocklet metadata
+   */
+  protected BlockletInfoColumnar getBlockletInfo(NodeHolder nodeHolder, long 
offset) {
+    // create the info object for leaf entry
+    BlockletInfoColumnar info = new BlockletInfoColumnar();
+    //add aggBlocks array
+    info.setAggKeyBlock(nodeHolder.getAggBlocks());
+    // add total entry count
+    info.setNumberOfKeys(nodeHolder.getEntryCount());
+
+    // add the key array length
+    info.setKeyLengths(nodeHolder.getKeyLengths());
+    // adding null measure index bit set
+    info.setMeasureNullValueIndex(nodeHolder.getMeasureNullValueIndex());
+    //add column min max length
+    info.setColumnMaxData(nodeHolder.getColumnMaxData());
+    info.setColumnMinData(nodeHolder.getColumnMinData());
+
+    // add measure length
+    info.setMeasureLength(nodeHolder.getMeasureLenght());
+
+    info.setIsSortedKeyColumn(nodeHolder.getIsSortedKeyBlock());
+    info.setKeyBlockIndexLength(nodeHolder.getKeyBlockIndexLength());
+    info.setDataIndexMapLength(nodeHolder.getDataIndexMapLength());
+    // set startkey
+    info.setStartKey(nodeHolder.getStartKey());
+    // set end key
+    info.setEndKey(nodeHolder.getEndKey());
+    info.setCompressionModel(nodeHolder.getCompressionModel());
+    // return leaf metadata
+
+    //colGroup Blocks
+    info.setColGrpBlocks(nodeHolder.getColGrpBlocks());
+
+    return info;
+  }
+
+  /**
+   * This method will write metadata at the end of file file format in thrift 
format
+   */
+  protected void writeBlockletInfoToFile(List<BlockletInfoColumnar> infoList, 
FileChannel channel,
+      String filePath) throws CarbonDataWriterException {
+    try {
+      // get the current file position
+      long currentPosition = channel.size();
+      CarbonFooterWriter writer = new CarbonFooterWriter(filePath);
+      // get thrift file footer instance
+      FileFooter convertFileMeta = CarbonMetadataUtil
+          .convertFilterFooter2(infoList, localCardinality, 
thriftColumnSchemaList,
+              dataChunksOffsets, dataChunksLength);
+      // fill the carbon index details
+      fillBlockIndexInfoDetails(infoList, convertFileMeta.getNum_rows(), 
filePath, currentPosition);
+      // write the footer
+      writer.writeFooter(convertFileMeta, currentPosition);
+    } catch (IOException e) {
+      throw new CarbonDataWriterException("Problem while writing the carbon 
file: ", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImplForIntIndexAndAggBlock.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImplForIntIndexAndAggBlock.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImplForIntIndexAndAggBlock.java
index 259482e..8c2608b 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImplForIntIndexAndAggBlock.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImplForIntIndexAndAggBlock.java
@@ -26,48 +26,23 @@ import java.util.List;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
-import 
org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastorage.store.columnar.IndexStorage;
-import 
org.apache.carbondata.core.datastorage.store.compression.SnappyCompression.SnappyByteCompression;
 import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressionModel;
-import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
 import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.processing.mdkeygen.file.IFileManagerComposite;
-import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.writer.CarbonFooterWriter;
+import org.apache.carbondata.format.FileFooter;
 import org.apache.carbondata.processing.store.colgroup.ColGroupBlockStorage;
 import 
org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
 
 public class CarbonFactDataWriterImplForIntIndexAndAggBlock extends 
AbstractFactDataWriter<int[]> {
 
-  protected boolean[] aggBlocks;
-  private NumberCompressor numberCompressor;
-  private boolean[] isComplexType;
-  private int numberOfNoDictionaryColumn;
-  private boolean[] isDictionaryColumn;
-  private static final LogService LOGGER = LogServiceFactory.getLogService(
-      CarbonFactDataWriterImplForIntIndexAndAggBlock.class.getName());
+  private static final LogService LOGGER = LogServiceFactory
+      
.getLogService(CarbonFactDataWriterImplForIntIndexAndAggBlock.class.getName());
 
-  public CarbonFactDataWriterImplForIntIndexAndAggBlock(String storeLocation, 
int measureCount,
-      int mdKeyLength, String tableName, IFileManagerComposite fileManager, 
int[] keyBlockSize,
-      boolean[] aggBlocks, boolean[] isComplexType, int NoDictionaryCount,
-      CarbonDataFileAttributes carbonDataFileAttributes, String databaseName,
-      List<ColumnSchema> wrapperColumnSchemaList, int 
numberOfNoDictionaryColumn,
-      boolean[] isDictionaryColumn, String carbonDataDirectoryPath, int[] 
colCardinality,
-      SegmentProperties segmentProperties, int tableBlocksize) {
-    super(storeLocation, measureCount, mdKeyLength, databaseName, tableName, 
fileManager,
-        keyBlockSize, carbonDataFileAttributes, wrapperColumnSchemaList, 
carbonDataDirectoryPath,
-        colCardinality, segmentProperties, tableBlocksize);
-    this.isComplexType = isComplexType;
-    this.databaseName = databaseName;
-    this.numberOfNoDictionaryColumn = numberOfNoDictionaryColumn;
-    this.isDictionaryColumn = isDictionaryColumn;
-    this.aggBlocks = aggBlocks;
-    this.numberCompressor = new 
NumberCompressor(Integer.parseInt(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.BLOCKLET_SIZE,
-            CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL)));
+  public CarbonFactDataWriterImplForIntIndexAndAggBlock(CarbonDataWriterVo 
dataWriterVo) {
+    super(dataWriterVo);
   }
 
   @Override
@@ -110,7 +85,7 @@ public class CarbonFactDataWriterImplForIntIndexAndAggBlock 
extends AbstractFact
 
       }
       totalKeySize += keyLengths[i];
-      if (isComplexType[i] || isDictionaryColumn[i]) {
+      if (dataWriterVo.getIsComplexType()[i] || 
dataWriterVo.getIsDictionaryColumn()[i]) {
         allMinValue[i] = keyStorageArray[i].getMin();
         allMaxValue[i] = keyStorageArray[i].getMax();
       } else {
@@ -142,16 +117,16 @@ public class 
CarbonFactDataWriterImplForIntIndexAndAggBlock extends AbstractFact
       }
     }
     int compressDataBlockSize = 0;
-    for (int i = 0; i < aggBlocks.length; i++) {
-      if (aggBlocks[i]) {
+    for (int i = 0; i < dataWriterVo.getAggBlocks().length; i++) {
+      if (dataWriterVo.getAggBlocks()[i]) {
         compressDataBlockSize++;
       }
     }
     byte[][] compressedDataIndex = new byte[compressDataBlockSize][];
     int[] dataIndexMapLength = new int[compressDataBlockSize];
     idx = 0;
-    for (int i = 0; i < aggBlocks.length; i++) {
-      if (aggBlocks[i]) {
+    for (int i = 0; i < dataWriterVo.getAggBlocks().length; i++) {
+      if (dataWriterVo.getAggBlocks()[i]) {
         try {
           compressedDataIndex[idx] =
               numberCompressor.compress(keyStorageArray[i].getDataIndexMap());
@@ -163,13 +138,7 @@ public class 
CarbonFactDataWriterImplForIntIndexAndAggBlock extends AbstractFact
       }
     }
 
-    byte[] writableKeyArray = new byte[totalKeySize];
-    int startPosition = 0;
-    for (int i = 0; i < keyLengths.length; i++) {
-      System.arraycopy(keyBlockData[i], 0, writableKeyArray, startPosition, 
keyBlockData[i].length);
-      startPosition += keyLengths[i];
-    }
-    int[] msrLength = new int[this.measureCount];
+    int[] msrLength = new int[dataWriterVo.getMeasureCount()];
     // calculate the total size required for all the measure and get the
     // each measure size
     for (int i = 0; i < dataArray.length; i++) {
@@ -177,30 +146,9 @@ public class 
CarbonFactDataWriterImplForIntIndexAndAggBlock extends AbstractFact
       totalMsrArrySize += currentMsrLenght;
       msrLength[i] = currentMsrLenght;
     }
-    byte[] writableDataArray = new byte[totalMsrArrySize];
-
-    // start position will be used for adding the measure in
-    // writableDataArray after adding measure increment the start position
-    // by added measure length which will be used for next measure start
-    // position
-    startPosition = 0;
-    for (int i = 0; i < dataArray.length; i++) {
-      System.arraycopy(dataArray[i], 0, writableDataArray, startPosition, 
dataArray[i].length);
-      startPosition += msrLength[i];
-    }
-    // current file size;
-    int indexBlockSize = 0;
-    for (int i = 0; i < keyBlockIdxLengths.length; i++) {
-      indexBlockSize += keyBlockIdxLengths[i] + 
CarbonCommonConstants.INT_SIZE_IN_BYTE;
-    }
-
-    for (int i = 0; i < dataIndexMapLength.length; i++) {
-      indexBlockSize += dataIndexMapLength[i];
-    }
-
     NodeHolder holder = new NodeHolder();
-    holder.setDataArray(writableDataArray);
-    holder.setKeyArray(writableKeyArray);
+    holder.setDataArray(dataArray);
+    holder.setKeyArray(keyBlockData);
     // end key format will be <length of dictionary key><length of no
     // dictionary key><DictionaryKey><No Dictionary key>
     byte[] updatedNoDictionaryEndKey = 
updateNoDictionaryStartAndEndKey(noDictionaryEndKey);
@@ -235,10 +183,12 @@ public class 
CarbonFactDataWriterImplForIntIndexAndAggBlock extends AbstractFact
     holder.setDataIndexMapLength(dataIndexMapLength);
     holder.setCompressedDataIndex(compressedDataIndex);
     holder.setCompressionModel(compressionModel);
+    holder.setTotalDimensionArrayLength(totalKeySize);
+    holder.setTotalMeasureArrayLength(totalMsrArrySize);
     //setting column min max value
     holder.setColumnMaxData(allMaxValue);
     holder.setColumnMinData(allMinValue);
-    holder.setAggBlocks(aggBlocks);
+    holder.setAggBlocks(dataWriterVo.getAggBlocks());
     holder.setColGrpBlocks(colGrpBlock);
     return holder;
   }
@@ -252,113 +202,28 @@ public class 
CarbonFactDataWriterImplForIntIndexAndAggBlock extends AbstractFact
     for (int i = 0; i < holder.getDataIndexMapLength().length; i++) {
       indexBlockSize += holder.getDataIndexMapLength()[i];
     }
+
     long blockletDataSize =
-        holder.getKeyArray().length + holder.getDataArray().length + 
indexBlockSize;
+        holder.getTotalDimensionArrayLength() + 
holder.getTotalMeasureArrayLength()
+            + indexBlockSize;
     updateBlockletFileChannel(blockletDataSize);
-    writeDataToFile(holder);
+    // write data to file and get its offset
+    long offset = writeDataToFile(holder, fileChannel);
+    // get the blocklet info for currently added blocklet
+    BlockletInfoColumnar blockletInfo = getBlockletInfo(holder, offset);
+    // add blocklet info to list
+    blockletInfoList.add(blockletInfo);
     LOGGER.info("A new blocklet is added, its data size is: " + 
blockletDataSize + " Byte");
   }
 
   /**
-   * Below method will be used to update the min or max value
-   * by removing the length from it
-   *
-   * @return min max value without length
-   */
-  private byte[] updateMinMaxForNoDictionary(byte[] valueWithLength) {
-    ByteBuffer buffer = ByteBuffer.wrap(valueWithLength);
-    byte[] actualValue = new byte[buffer.getShort()];
-    buffer.get(actualValue);
-    return actualValue;
-  }
-
-  /**
-   * Below method will be used to update the no dictionary start and end key
-   *
-   * @param key key to be updated
-   * @return return no dictionary key
-   */
-  private byte[] updateNoDictionaryStartAndEndKey(byte[] key) {
-    if (key.length == 0) {
-      return key;
-    }
-    // add key to byte buffer remove the length part of the data
-    ByteBuffer buffer = ByteBuffer.wrap(key, 2, key.length - 2);
-    // create a output buffer without length
-    ByteBuffer output = ByteBuffer.allocate(key.length - 2);
-    short numberOfByteToStorLength = 2;
-    // as length part is removed, so each no dictionary value index
-    // needs to be reshuffled by 2 bytes
-    for (int i = 0; i < numberOfNoDictionaryColumn; i++) {
-      output.putShort((short) (buffer.getShort() - numberOfByteToStorLength));
-    }
-    // copy the data part
-    while (buffer.hasRemaining()) {
-      output.put(buffer.get());
-    }
-    output.rewind();
-    return output.array();
-  }
-
-  protected byte[][] fillAndCompressedKeyBlockData(IndexStorage<int[]>[] 
keyStorageArray,
-      int entryCount) {
-    byte[][] keyBlockData = new byte[keyStorageArray.length][];
-    int destPos = 0;
-    int keyBlockSizePosition = -1;
-    for (int i = 0; i < keyStorageArray.length; i++) {
-      destPos = 0;
-      //handling for high card dims
-      if (!isComplexType[i] && !this.isDictionaryColumn[i]) {
-        int totalLength = 0;
-        // calc size of the total bytes in all the colmns.
-        for (int k = 0; k < keyStorageArray[i].getKeyBlock().length; k++) {
-          byte[] colValue = keyStorageArray[i].getKeyBlock()[k];
-          totalLength += colValue.length;
-        }
-        keyBlockData[i] = new byte[totalLength];
-
-        for (int j = 0; j < keyStorageArray[i].getKeyBlock().length; j++) {
-          int length = keyStorageArray[i].getKeyBlock()[j].length;
-          System
-              .arraycopy(keyStorageArray[i].getKeyBlock()[j], 0, 
keyBlockData[i], destPos, length);
-          destPos += length;
-        }
-      } else {
-        keyBlockSizePosition++;
-        if (aggBlocks[i]) {
-          keyBlockData[i] = new byte[keyStorageArray[i].getTotalSize()];
-          for (int j = 0; j < keyStorageArray[i].getKeyBlock().length; j++) {
-            System.arraycopy(keyStorageArray[i].getKeyBlock()[j], 0, 
keyBlockData[i], destPos,
-                keyStorageArray[i].getKeyBlock()[j].length);
-            destPos += keyStorageArray[i].getKeyBlock()[j].length;
-          }
-        } else {
-          if (isComplexType[i]) {
-            keyBlockData[i] = new byte[keyStorageArray[i].getKeyBlock().length
-                * keyBlockSize[keyBlockSizePosition]];
-          } else {
-            keyBlockData[i] = new byte[entryCount * 
keyBlockSize[keyBlockSizePosition]];
-          }
-          for (int j = 0; j < keyStorageArray[i].getKeyBlock().length; j++) {
-            System.arraycopy(keyStorageArray[i].getKeyBlock()[j], 0, 
keyBlockData[i], destPos,
-                keyBlockSize[keyBlockSizePosition]);
-            destPos += keyBlockSize[keyBlockSizePosition];
-          }
-        }
-      }
-      keyBlockData[i] = 
SnappyByteCompression.INSTANCE.compress(keyBlockData[i]);
-    }
-    return keyBlockData;
-  }
-
-  /**
    * This method is responsible for writing blocklet to the data file
    *
    * @return file offset offset is the current position of the file
    * @throws CarbonDataWriterException if will throw CarbonDataWriterException 
when any thing
    *                                   goes wrong while while writing the leaf 
file
    */
-  protected long writeDataToFile(NodeHolder nodeHolder, FileChannel channel)
+  private long writeDataToFile(NodeHolder nodeHolder, FileChannel channel)
       throws CarbonDataWriterException {
     // create byte buffer
     byte[][] compressedIndex = nodeHolder.getCompressedIndex();
@@ -375,15 +240,20 @@ public class 
CarbonFactDataWriterImplForIntIndexAndAggBlock extends AbstractFact
       indexBlockSize += nodeHolder.getDataIndexMapLength()[i];
     }
     ByteBuffer byteBuffer = ByteBuffer.allocate(
-        nodeHolder.getKeyArray().length + nodeHolder.getDataArray().length + 
indexBlockSize);
+        nodeHolder.getTotalDimensionArrayLength() + 
nodeHolder.getTotalMeasureArrayLength()
+            + indexBlockSize);
     long offset = 0;
     try {
       // get the current offset
       offset = channel.size();
       // add key array to byte buffer
-      byteBuffer.put(nodeHolder.getKeyArray());
+      for (int i = 0; i < nodeHolder.getKeyArray().length; i++) {
+        byteBuffer.put(nodeHolder.getKeyArray()[i]);
+      }
+      for (int i = 0; i < nodeHolder.getDataArray().length; i++) {
+        byteBuffer.put(nodeHolder.getDataArray()[i]);
+      }
       // add measure data array to byte buffer
-      byteBuffer.put(nodeHolder.getDataArray());
 
       ByteBuffer buffer1 = null;
       for (int i = 0; i < compressedIndex.length; i++) {
@@ -448,9 +318,9 @@ public class CarbonFactDataWriterImplForIntIndexAndAggBlock 
extends AbstractFact
     // add measure length
     info.setMeasureLength(nodeHolder.getMeasureLenght());
 
-    long[] msrOffset = new long[this.measureCount];
+    long[] msrOffset = new long[dataWriterVo.getMeasureCount()];
 
-    for (int i = 0; i < this.measureCount; i++) {
+    for (int i = 0; i < msrOffset.length; i++) {
       // increment the current offset by 4 bytes because 4 bytes will be
       // used for measure byte length
       //            offset += CarbonCommonConstants.INT_SIZE_IN_BYTE;
@@ -489,4 +359,21 @@ public class 
CarbonFactDataWriterImplForIntIndexAndAggBlock extends AbstractFact
     return info;
   }
 
+  /**
+   * This method will write metadata at the end of file file format in thrift 
format
+   */
+  protected void writeBlockletInfoToFile(List<BlockletInfoColumnar> infoList, 
FileChannel channel,
+      String filePath) throws CarbonDataWriterException {
+    try {
+      long currentPosition = channel.size();
+      CarbonFooterWriter writer = new CarbonFooterWriter(filePath);
+      FileFooter convertFileMeta = CarbonMetadataUtil
+          .convertFileFooter(infoList, localCardinality.length, 
localCardinality,
+              thriftColumnSchemaList, dataWriterVo.getSegmentProperties());
+      fillBlockIndexInfoDetails(infoList, convertFileMeta.getNum_rows(), 
filePath, currentPosition);
+      writer.writeFooter(convertFileMeta, currentPosition);
+    } catch (IOException e) {
+      throw new CarbonDataWriterException("Problem while writing the carbon 
file: ", e);
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/processing/src/main/java/org/apache/carbondata/processing/store/writer/NodeHolder.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/NodeHolder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/NodeHolder.java
index aa758e6..a7d14f0 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/NodeHolder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/NodeHolder.java
@@ -27,12 +27,12 @@ public class NodeHolder {
   /**
    * keyArray
    */
-  private byte[] keyArray;
+  private byte[][] keyArray;
 
   /**
    * dataArray
    */
-  private byte[] dataArray;
+  private byte[][] dataArray;
 
   /**
    * measureLenght
@@ -139,30 +139,40 @@ public class NodeHolder {
   private BitSet[] measureNullValueIndex;
 
   /**
+   * total length of dimension values
+   */
+  private int totalDimensionArrayLength;
+
+  /**
+   * total length of all measure values
+   */
+  private int totalMeasureArrayLength;
+
+  /**
    * @return the keyArray
    */
-  public byte[] getKeyArray() {
+  public byte[][] getKeyArray() {
     return keyArray;
   }
 
   /**
    * @param keyArray the keyArray to set
    */
-  public void setKeyArray(byte[] keyArray) {
+  public void setKeyArray(byte[][] keyArray) {
     this.keyArray = keyArray;
   }
 
   /**
    * @return the dataArray
    */
-  public byte[] getDataArray() {
+  public byte[][] getDataArray() {
     return dataArray;
   }
 
   /**
    * @param dataArray the dataArray to set
    */
-  public void setDataArray(byte[] dataArray) {
+  public void setDataArray(byte[][] dataArray) {
     this.dataArray = dataArray;
   }
 
@@ -453,4 +463,20 @@ public class NodeHolder {
   public void setMeasureNullValueIndex(BitSet[] measureNullValueIndex) {
     this.measureNullValueIndex = measureNullValueIndex;
   }
+
+  public int getTotalDimensionArrayLength() {
+    return totalDimensionArrayLength;
+  }
+
+  public void setTotalDimensionArrayLength(int totalDimensionArrayLength) {
+    this.totalDimensionArrayLength = totalDimensionArrayLength;
+  }
+
+  public int getTotalMeasureArrayLength() {
+    return totalMeasureArrayLength;
+  }
+
+  public void setTotalMeasureArrayLength(int totalMeasureArrayLength) {
+    this.totalMeasureArrayLength = totalMeasureArrayLength;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
----------------------------------------------------------------------
diff --git 
a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
 
b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
index 62b442f..84192b8 100644
--- 
a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
+++ 
b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
@@ -35,9 +35,12 @@ import 
org.apache.carbondata.core.carbon.datastore.block.AbstractIndex;
 import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import 
org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.test.util.StoreCreator;
 
 import junit.framework.TestCase;
+
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -45,10 +48,22 @@ public class BlockIndexStoreTest extends TestCase {
 
   private BlockIndexStore indexStore;
 
+  private String property;
   @BeforeClass public void setUp() {
+       property = 
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION);
+       
+       
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
 "1");  
     StoreCreator.createCarbonStore();
     indexStore = BlockIndexStore.getInstance();
   }
+  
+  @AfterClass public void tearDown() {
+           if(null!=property) {
+               
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
 property);
+           }else {
+               
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
 CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION+"");
+           }
+         }
 
   @Test public void testloadAndGetTaskIdToSegmentsMapForSingleSegment() throws 
IOException {
     String canonicalPath =
@@ -56,7 +71,7 @@ public class BlockIndexStoreTest extends TestCase {
     File file = getPartFile();
     TableBlockInfo info =
         new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { 
"loclhost" },
-            file.length());
+            file.length(),(short)1);
     CarbonTableIdentifier carbonTableIdentifier =
             new 
CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1");
     AbsoluteTableIdentifier absoluteTableIdentifier =
@@ -78,20 +93,20 @@ public class BlockIndexStoreTest extends TestCase {
     File file = getPartFile();
     TableBlockInfo info =
         new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { 
"loclhost" },
-            file.length());
+            file.length(), (short)1);
     TableBlockInfo info1 =
         new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { 
"loclhost" },
-            file.length());
+            file.length(), (short)1);
 
     TableBlockInfo info2 =
         new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { 
"loclhost" },
-            file.length());
+            file.length(), (short)1);
     TableBlockInfo info3 =
         new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { 
"loclhost" },
-            file.length());
+            file.length(), (short)1);
     TableBlockInfo info4 =
         new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { 
"loclhost" },
-            file.length());
+            file.length(), (short)1);
 
     CarbonTableIdentifier carbonTableIdentifier =
             new 
CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1");
@@ -133,31 +148,31 @@ public class BlockIndexStoreTest extends TestCase {
     File file = getPartFile();
     TableBlockInfo info =
         new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { 
"loclhost" },
-            file.length());
+            file.length(), (short)1);
     TableBlockInfo info1 =
         new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { 
"loclhost" },
-            file.length());
+            file.length(), (short)1);
 
     TableBlockInfo info2 =
         new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { 
"loclhost" },
-            file.length());
+            file.length(), (short)1);
     TableBlockInfo info3 =
         new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { 
"loclhost" },
-            file.length());
+            file.length(), (short)1);
     TableBlockInfo info4 =
         new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { 
"loclhost" },
-            file.length());
+            file.length(), (short)1);
 
     TableBlockInfo info5 =
         new TableBlockInfo(file.getAbsolutePath(), 0, "2", new String[] { 
"loclhost" },
-            file.length());
+            file.length(),(short)1);
     TableBlockInfo info6 =
         new TableBlockInfo(file.getAbsolutePath(), 0, "2", new String[] { 
"loclhost" },
-            file.length());
+            file.length(), (short)1);
 
     TableBlockInfo info7 =
         new TableBlockInfo(file.getAbsolutePath(), 0, "3", new String[] { 
"loclhost" },
-            file.length());
+            file.length(), (short)1);
 
     CarbonTableIdentifier carbonTableIdentifier =
             new 
CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1");

Reply via email to