http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/processing/src/main/java/org/apache/carbondata/processing/merger/TableMeta.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/TableMeta.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/TableMeta.java
new file mode 100644
index 0000000..b84d695
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/TableMeta.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.merger;
+
+import java.io.Serializable;
+
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+
+public class TableMeta implements Serializable {
+
+  private static final long serialVersionUID = -1749874611119829431L;
+
+  public CarbonTableIdentifier carbonTableIdentifier;
+  public String storePath;
+  public CarbonTable carbonTable;
+
+  public TableMeta(CarbonTableIdentifier carbonTableIdentifier, String 
storePath,
+      CarbonTable carbonTable) {
+    this.carbonTableIdentifier = carbonTableIdentifier;
+    this.storePath = storePath;
+    this.carbonTable = carbonTable;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/processing/src/main/java/org/apache/carbondata/processing/merger/TupleConversionAdapter.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/TupleConversionAdapter.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/TupleConversionAdapter.java
new file mode 100644
index 0000000..f508a87
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/TupleConversionAdapter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.merger;
+
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
+
+/**
+ * This class will be used to convert the Result into the format used in data 
writer.
+ */
+class TupleConversionAdapter {
+
+  private final SegmentProperties segmentproperties;
+
+  private int noDictionaryPresentIndex;
+
+  private int measureCount;
+
+  private boolean isNoDictionaryPresent;
+
+  public TupleConversionAdapter(SegmentProperties segmentProperties) {
+    this.measureCount = segmentProperties.getMeasures().size();
+    this.isNoDictionaryPresent = 
segmentProperties.getNumberOfNoDictionaryDimension() > 0;
+    if (isNoDictionaryPresent) {
+      noDictionaryPresentIndex++;
+    }
+    this.segmentproperties = segmentProperties;
+  }
+
+  /**
+   * Converting the raw result to the format understandable by the data writer.
+   * @param carbonTuple
+   * @return
+   */
+  public Object[] getObjectArray(Object[] carbonTuple) {
+    Object[] row = new Object[measureCount + noDictionaryPresentIndex + 1];
+    int index = 0;
+    // put measures.
+
+    for (int j = 1; j <= measureCount; j++) {
+      row[index++] = carbonTuple[j];
+    }
+
+    // put No dictionary byte []
+    if (isNoDictionaryPresent) {
+      row[index++] = ((ByteArrayWrapper) carbonTuple[0]).getNoDictionaryKeys();
+    }
+
+    // put No Dictionary Dims
+    row[index++] = ((ByteArrayWrapper) carbonTuple[0]).getDictionaryKey();
+    return row;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/processing/src/main/java/org/apache/carbondata/processing/merger/exeception/SliceMergerException.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/exeception/SliceMergerException.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/exeception/SliceMergerException.java
new file mode 100644
index 0000000..3ae3604
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/exeception/SliceMergerException.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.merger.exeception;
+
+import java.util.Locale;
+
+public class SliceMergerException extends Exception {
+
+  /**
+   * default serial version ID.
+   */
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * The Error message.
+   */
+  private String msg = "";
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public SliceMergerException(String msg) {
+    super(msg);
+    this.msg = msg;
+  }
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public SliceMergerException(String msg, Throwable t) {
+    super(msg, t);
+    this.msg = msg;
+  }
+
+  /**
+   * This method is used to get the localized message.
+   *
+   * @param locale - A Locale object represents a specific geographical,
+   *               political, or cultural region.
+   * @return - Localized error message.
+   */
+  public String getLocalizedMessage(Locale locale) {
+    return "";
+  }
+
+  /**
+   * getLocalizedMessage
+   */
+  @Override public String getLocalizedMessage() {
+    return super.getLocalizedMessage();
+  }
+
+  /**
+   * getMessage
+   */
+  public String getMessage() {
+    return this.msg;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
index 4ebb2fb..3980bcf 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
@@ -158,9 +158,7 @@ public final class DataLoadProcessBuilder {
         loadModel.getIsEmptyDataBadRecord().split(",")[1]);
     
configuration.setDataLoadProperty(DataLoadProcessorConstants.FACT_FILE_PATH,
         loadModel.getFactFilePath());
-    if 
(CarbonMetadata.getInstance().getCarbonTable(carbonTable.getTableUniqueName()) 
== null) {
-      CarbonMetadata.getInstance().addCarbonTable(carbonTable);
-    }
+    CarbonMetadata.getInstance().addCarbonTable(carbonTable);
     List<CarbonDimension> dimensions =
         carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
     List<CarbonMeasure> measures =

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
index f10e73a..a85a34f 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
@@ -22,9 +22,7 @@ import java.util.Iterator;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.IgnoreDictionary;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
@@ -50,8 +48,6 @@ public class DataWriterProcessorStepImpl extends 
AbstractDataLoadProcessorStep {
 
   private SegmentProperties segmentProperties;
 
-  private KeyGenerator keyGenerator;
-
   private int noDictionaryCount;
 
   private int complexDimensionCount;
@@ -60,12 +56,6 @@ public class DataWriterProcessorStepImpl extends 
AbstractDataLoadProcessorStep {
 
   private long readCounter;
 
-  private int measureIndex = IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex();
-
-  private int noDimByteArrayIndex = 
IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex();
-
-  private int dimsArrayIndex = 
IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex();
-
   public DataWriterProcessorStepImpl(CarbonDataLoadConfiguration configuration,
       AbstractDataLoadProcessorStep child) {
     super(configuration, child);
@@ -101,8 +91,6 @@ public class DataWriterProcessorStepImpl extends 
AbstractDataLoadProcessorStep {
       complexDimensionCount = configuration.getComplexDimensionCount();
       measureCount = dataHandlerModel.getMeasureCount();
       segmentProperties = dataHandlerModel.getSegmentProperties();
-      keyGenerator = segmentProperties.getDimensionKeyGenerator();
-
       CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
           
.recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
               System.currentTimeMillis());
@@ -183,27 +171,10 @@ public class DataWriterProcessorStepImpl extends 
AbstractDataLoadProcessorStep {
       while (batch.hasNext()) {
         CarbonRow row = batch.next();
         readCounter++;
-        /*
-        * The order of the data is as follows,
-        * Measuredata, nodictionary/complex byte array data, dictionary(MDK 
generated key)
-        */
-        int len;
-        // adding one for the high cardinality dims byte array.
-        if (noDictionaryCount > 0 || complexDimensionCount > 0) {
-          len = measureCount + 1 + 1;
-        } else {
-          len = measureCount + 1;
-        }
-        Object[] outputRow = new Object[len];
-
-
-        int l = 0;
-        Object[] measures = row.getObjectArray(measureIndex);
-        for (int i = 0; i < measureCount; i++) {
-          outputRow[l++] = measures[i];
-        }
-        outputRow[l] = row.getObject(noDimByteArrayIndex);
-        outputRow[len - 1] = 
keyGenerator.generateKey(row.getIntArray(dimsArrayIndex));
+        // convert the row from surrogate key to MDKey
+        Object[] outputRow = CarbonDataProcessorUtil
+            .convertToMDKeyAndFillRow(row, segmentProperties, measureCount, 
noDictionaryCount,
+                complexDimensionCount);
         dataHandler.addDataToStore(outputRow);
       }
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
 
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
index d42dc32..41e3018 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
@@ -423,8 +423,7 @@ public class SortParameters {
 
   public static SortParameters createSortParameters(String databaseName, 
String tableName,
       int dimColCount, int complexDimColCount, int measureColCount, int 
noDictionaryCount,
-      String partitionID, String segmentId, String taskNo,
-      boolean[] noDictionaryColMaping) {
+      String partitionID, String segmentId, String taskNo, boolean[] 
noDictionaryColMaping) {
     SortParameters parameters = new SortParameters();
     CarbonProperties carbonProperties = CarbonProperties.getInstance();
     parameters.setDatabaseName(databaseName);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index ffd23a2..e64caea 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.processing.store;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -29,12 +30,16 @@ import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
 import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
 import 
org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
@@ -294,6 +299,73 @@ public class CarbonFactDataHandlerModel {
   }
 
   /**
+   * This method will create a model object for carbon fact data handler
+   *
+   * @param loadModel
+   * @return
+   */
+  public static CarbonFactDataHandlerModel 
getCarbonFactDataHandlerModel(CarbonLoadModel loadModel,
+      CarbonTable carbonTable, SegmentProperties segmentProperties, String 
tableName,
+      String tempStoreLocation) {
+    CarbonFactDataHandlerModel carbonFactDataHandlerModel = new 
CarbonFactDataHandlerModel();
+    carbonFactDataHandlerModel.setDatabaseName(loadModel.getDatabaseName());
+    carbonFactDataHandlerModel.setTableName(tableName);
+    
carbonFactDataHandlerModel.setMeasureCount(segmentProperties.getMeasures().size());
+    carbonFactDataHandlerModel
+        
.setMdKeyLength(segmentProperties.getDimensionKeyGenerator().getKeySizeInBytes());
+    carbonFactDataHandlerModel.setStoreLocation(tempStoreLocation);
+    
carbonFactDataHandlerModel.setDimLens(segmentProperties.getDimColumnsCardinality());
+    carbonFactDataHandlerModel.setSegmentProperties(segmentProperties);
+    carbonFactDataHandlerModel
+        
.setNoDictionaryCount(segmentProperties.getNumberOfNoDictionaryDimension());
+    carbonFactDataHandlerModel.setDimensionCount(
+        segmentProperties.getDimensions().size() - carbonFactDataHandlerModel
+            .getNoDictionaryCount());
+    List<ColumnSchema> wrapperColumnSchema = CarbonUtil
+        .getColumnSchemaList(carbonTable.getDimensionByTableName(tableName),
+            carbonTable.getMeasureByTableName(tableName));
+    carbonFactDataHandlerModel.setWrapperColumnSchema(wrapperColumnSchema);
+    // get the cardinality for all all the columns including no dictionary 
columns
+    int[] formattedCardinality = CarbonUtil
+        .getFormattedCardinality(segmentProperties.getDimColumnsCardinality(), 
wrapperColumnSchema);
+    carbonFactDataHandlerModel.setColCardinality(formattedCardinality);
+    //TO-DO Need to handle complex types here .
+    Map<Integer, GenericDataType> complexIndexMap =
+        new HashMap<Integer, 
GenericDataType>(segmentProperties.getComplexDimensions().size());
+    carbonFactDataHandlerModel.setComplexIndexMap(complexIndexMap);
+    carbonFactDataHandlerModel.setDataWritingRequest(true);
+    char[] aggType = new char[segmentProperties.getMeasures().size()];
+    Arrays.fill(aggType, 'n');
+    int i = 0;
+    for (CarbonMeasure msr : segmentProperties.getMeasures()) {
+      aggType[i++] = DataTypeUtil.getAggType(msr.getDataType());
+    }
+    carbonFactDataHandlerModel.setAggType(aggType);
+    
carbonFactDataHandlerModel.setFactDimLens(segmentProperties.getDimColumnsCardinality());
+    String carbonDataDirectoryPath = CarbonDataProcessorUtil
+        .checkAndCreateCarbonStoreLocation(loadModel.getStorePath(), 
loadModel.getDatabaseName(),
+            tableName, loadModel.getPartitionId(), loadModel.getSegmentId());
+    
carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath);
+    List<CarbonDimension> dimensionByTableName =
+        
loadModel.getCarbonDataLoadSchema().getCarbonTable().getDimensionByTableName(tableName);
+    boolean[] isUseInvertedIndexes = new boolean[dimensionByTableName.size()];
+    int index = 0;
+    for (CarbonDimension dimension : dimensionByTableName) {
+      isUseInvertedIndexes[index++] = dimension.isUseInvertedIndex();
+    }
+    carbonFactDataHandlerModel.setIsUseInvertedIndex(isUseInvertedIndexes);
+    
carbonFactDataHandlerModel.setPrimitiveDimLens(segmentProperties.getDimColumnsCardinality());
+    
carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB());
+    if (segmentProperties.getNumberOfNoDictionaryDimension() > 0
+        || segmentProperties.getComplexDimensions().size() > 0) {
+      
carbonFactDataHandlerModel.setMdKeyIndex(segmentProperties.getMeasures().size() 
+ 1);
+    } else {
+      
carbonFactDataHandlerModel.setMdKeyIndex(segmentProperties.getMeasures().size());
+    }
+    return carbonFactDataHandlerModel;
+  }
+
+  /**
    * This method will get the store location for the given path, segment id 
and partition id
    *
    * @return data directory path

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 27662a4..178b43b 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -33,11 +33,15 @@ import java.util.Set;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.IgnoreDictionary;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.impl.FileFactory.FileType;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
@@ -53,6 +57,7 @@ import 
org.apache.carbondata.processing.datatypes.PrimitiveDataType;
 import org.apache.carbondata.processing.datatypes.StructDataType;
 import org.apache.carbondata.processing.model.CarbonDataLoadSchema;
 import org.apache.carbondata.processing.newflow.DataField;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
 
 import org.apache.commons.lang3.ArrayUtils;
 
@@ -431,4 +436,74 @@ public final class CarbonDataProcessorUtil {
     return dateformatsHashMap;
   }
 
+  /**
+   * This method will convert surrogate key to MD key and fill the row in 
format
+   * required by the writer for further processing
+   *
+   * @param row
+   * @param segmentProperties
+   * @param measureCount
+   * @param noDictionaryCount
+   * @param complexDimensionCount
+   * @return
+   * @throws KeyGenException
+   */
+  public static Object[] convertToMDKeyAndFillRow(CarbonRow row,
+      SegmentProperties segmentProperties, int measureCount, int 
noDictionaryCount,
+      int complexDimensionCount) throws KeyGenException {
+    Object[] outputRow = null;
+    // adding one for the high cardinality dims byte array.
+    if (noDictionaryCount > 0 || complexDimensionCount > 0) {
+      outputRow = new Object[measureCount + 1 + 1];
+    } else {
+      outputRow = new Object[measureCount + 1];
+    }
+    int l = 0;
+    int index = 0;
+    Object[] measures = 
row.getObjectArray(IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex());
+    for (int i = 0; i < measureCount; i++) {
+      outputRow[l++] = measures[index++];
+    }
+    outputRow[l] = 
row.getObject(IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex());
+    int[] highCardExcludedRows = new 
int[segmentProperties.getDimColumnsCardinality().length];
+    int[] dimsArray = 
row.getIntArray(IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex());
+    for (int i = 0; i < highCardExcludedRows.length; i++) {
+      highCardExcludedRows[i] = dimsArray[i];
+    }
+    outputRow[outputRow.length - 1] =
+        
segmentProperties.getDimensionKeyGenerator().generateKey(highCardExcludedRows);
+    return outputRow;
+  }
+
+  /**
+   * This method will get the store location for the given path, segment id 
and partition id
+   *
+   * @return data directory path
+   */
+  public static String checkAndCreateCarbonStoreLocation(String 
factStoreLocation,
+      String databaseName, String tableName, String partitionId, String 
segmentId) {
+    CarbonTable carbonTable = CarbonMetadata.getInstance()
+        .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + 
tableName);
+    CarbonTableIdentifier carbonTableIdentifier = 
carbonTable.getCarbonTableIdentifier();
+    CarbonTablePath carbonTablePath =
+        CarbonStorePath.getCarbonTablePath(factStoreLocation, 
carbonTableIdentifier);
+    String carbonDataDirectoryPath =
+        carbonTablePath.getCarbonDataDirectoryPath(partitionId, segmentId);
+    CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
+    return carbonDataDirectoryPath;
+  }
+
+  /**
+   * initialise aggregation type for measures for their storage format
+   */
+  public static char[] initAggType(CarbonTable carbonTable, String tableName, 
int measureCount) {
+    char[] aggType = new char[measureCount];
+    Arrays.fill(aggType, 'n');
+    List<CarbonMeasure> measures = 
carbonTable.getMeasureByTableName(tableName);
+    for (int i = 0; i < measureCount; i++) {
+      aggType[i] = DataTypeUtil.getAggType(measures.get(i).getDataType());
+    }
+    return aggType;
+  }
+
 }
\ No newline at end of file

Reply via email to