Revert "[CARBONDATA-2018][DataLoad] Optimization in reading/writing for sort 
temp row"

This reverts commit de92ea9a123b17d903f2d1d4662299315c792954.


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/78aa2cc3
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/78aa2cc3
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/78aa2cc3

Branch: refs/heads/carbonstore-rebase5
Commit: 78aa2cc3d1e52bf940138538a1f8c0bdfac666bc
Parents: 22bb333
Author: Jacky Li <jacky.li...@qq.com>
Authored: Sat Feb 10 20:11:25 2018 +0800
Committer: Jacky Li <jacky.li...@qq.com>
Committed: Fri Mar 2 15:52:35 2018 +0800

----------------------------------------------------------------------
 .../carbondata/core/util/NonDictionaryUtil.java |  67 ++-
 .../presto/util/CarbonDataStoreCreator.scala    |   1 +
 .../load/DataLoadProcessorStepOnSpark.scala     |   6 +-
 .../loading/row/IntermediateSortTempRow.java    | 117 -----
 .../loading/sort/SortStepRowHandler.java        | 466 -------------------
 .../loading/sort/SortStepRowUtil.java           | 103 ++++
 .../sort/unsafe/UnsafeCarbonRowPage.java        | 331 +++++++++++--
 .../loading/sort/unsafe/UnsafeSortDataRows.java |  57 ++-
 .../unsafe/comparator/UnsafeRowComparator.java  |  95 ++--
 .../UnsafeRowComparatorForNormalDIms.java       |  59 +++
 .../UnsafeRowComparatorForNormalDims.java       |  59 ---
 .../sort/unsafe/holder/SortTempChunkHolder.java |   3 +-
 .../holder/UnsafeFinalMergePageHolder.java      |  19 +-
 .../unsafe/holder/UnsafeInmemoryHolder.java     |  21 +-
 .../holder/UnsafeSortTempFileChunkHolder.java   | 138 ++++--
 .../merger/UnsafeIntermediateFileMerger.java    | 118 ++++-
 .../UnsafeSingleThreadFinalSortFilesMerger.java |  27 +-
 .../merger/CompactionResultSortProcessor.java   |   1 +
 .../sort/sortdata/IntermediateFileMerger.java   |  95 +++-
 .../IntermediateSortTempRowComparator.java      |  73 ---
 .../sort/sortdata/NewRowComparator.java         |   5 +-
 .../sortdata/NewRowComparatorForNormalDims.java |   3 +-
 .../processing/sort/sortdata/RowComparator.java |  94 ++++
 .../sortdata/RowComparatorForNormalDims.java    |  62 +++
 .../SingleThreadFinalSortFilesMerger.java       |  25 +-
 .../processing/sort/sortdata/SortDataRows.java  |  85 +++-
 .../sort/sortdata/SortTempFileChunkHolder.java  | 174 +++++--
 .../sort/sortdata/TableFieldStat.java           | 176 -------
 28 files changed, 1294 insertions(+), 1186 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/78aa2cc3/core/src/main/java/org/apache/carbondata/core/util/NonDictionaryUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/NonDictionaryUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/NonDictionaryUtil.java
index fca1244..d6ecfbc 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/NonDictionaryUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/NonDictionaryUtil.java
@@ -82,26 +82,18 @@ public class NonDictionaryUtil {
   }
 
   /**
-   * Method to get the required dictionary Dimension from obj []
+   * Method to get the required Dimension from obj []
    *
    * @param index
    * @param row
    * @return
    */
-  public static int getDictDimension(int index, Object[] row) {
-    int[] dimensions = (int[]) row[WriteStepRowUtil.DICTIONARY_DIMENSION];
+  public static Integer getDimension(int index, Object[] row) {
+
+    Integer[] dimensions = (Integer[]) 
row[WriteStepRowUtil.DICTIONARY_DIMENSION];
+
     return dimensions[index];
-  }
 
-  /**
-   * Method to get the required non-dictionary & complex from 3-parted row
-   * @param index
-   * @param row
-   * @return
-   */
-  public static byte[] getNoDictOrComplex(int index, Object[] row) {
-    byte[][] nonDictArray = (byte[][]) 
row[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX];
-    return nonDictArray[index];
   }
 
   /**
@@ -116,11 +108,60 @@ public class NonDictionaryUtil {
     return measures[index];
   }
 
+  public static byte[] getByteArrayForNoDictionaryCols(Object[] row) {
+
+    return (byte[]) row[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX];
+  }
+
   public static void prepareOutObj(Object[] out, int[] dimArray, byte[][] 
byteBufferArr,
       Object[] measureArray) {
+
     out[WriteStepRowUtil.DICTIONARY_DIMENSION] = dimArray;
     out[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX] = byteBufferArr;
     out[WriteStepRowUtil.MEASURE] = measureArray;
 
   }
+
+  /**
+   * This method will extract the single dimension from the complete high card 
dims byte[].+     *
+   * The format of the byte [] will be,  Totallength,CompleteStartOffsets,Dat
+   *
+   * @param highCardArr
+   * @param index
+   * @param highCardinalityCount
+   * @param outBuffer
+   */
+  public static void extractSingleHighCardDims(byte[] highCardArr, int index,
+      int highCardinalityCount, ByteBuffer outBuffer) {
+    ByteBuffer buff = null;
+    short secIndex = 0;
+    short firstIndex = 0;
+    int length;
+    // if the requested index is a last one then we need to calculate length
+    // based on byte[] length.
+    if (index == highCardinalityCount - 1) {
+      // need to read 2 bytes(1 short) to determine starting offset and
+      // length can be calculated by array length.
+      buff = ByteBuffer.wrap(highCardArr, (index * 2) + 2, 2);
+    } else {
+      // need to read 4 bytes(2 short) to determine starting offset and
+      // length.
+      buff = ByteBuffer.wrap(highCardArr, (index * 2) + 2, 4);
+    }
+
+    firstIndex = buff.getShort();
+    // if it is a last dimension in high card then this will be last
+    // offset.so calculate length from total length
+    if (index == highCardinalityCount - 1) {
+      secIndex = (short) highCardArr.length;
+    } else {
+      secIndex = buff.getShort();
+    }
+
+    length = secIndex - firstIndex;
+
+    outBuffer.position(firstIndex);
+    outBuffer.limit(outBuffer.position() + length);
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/78aa2cc3/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
----------------------------------------------------------------------
diff --git 
a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
 
b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
index 7203278..1d7c791 100644
--- 
a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
+++ 
b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
@@ -383,6 +383,7 @@ object CarbonDataStoreCreator {
       .getInstance.createCache(CacheType.REVERSE_DICTIONARY)
 
     for (i <- set.indices) {
+      //      val dim = getDimension(dims, i).get
       val columnIdentifier: ColumnIdentifier =
         new ColumnIdentifier(dims.get(i).getColumnId, null, null)
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/78aa2cc3/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index 0422239..5124247 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -35,7 +35,7 @@ import 
org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl
 import 
org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.loading.parser.impl.RowParserImpl
-import org.apache.carbondata.processing.loading.sort.SortStepRowHandler
+import org.apache.carbondata.processing.loading.sort.SortStepRowUtil
 import 
org.apache.carbondata.processing.loading.steps.DataWriterProcessorStepImpl
 import org.apache.carbondata.processing.sort.sortdata.SortParameters
 import org.apache.carbondata.processing.store.{CarbonFactHandler, 
CarbonFactHandlerFactory}
@@ -206,7 +206,7 @@ object DataLoadProcessorStepOnSpark {
     val model: CarbonLoadModel = 
modelBroadcast.value.getCopyWithTaskNo(index.toString)
     val conf = DataLoadProcessBuilder.createConfiguration(model)
     val sortParameters = SortParameters.createSortParameters(conf)
-    val sortStepRowHandler = new SortStepRowHandler(sortParameters)
+    val sortStepRowUtil = new SortStepRowUtil(sortParameters)
     TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) 
=>
       wrapException(e, model)
     }
@@ -216,7 +216,7 @@ object DataLoadProcessorStepOnSpark {
 
       override def next(): CarbonRow = {
         val row =
-          new 
CarbonRow(sortStepRowHandler.convertRawRowTo3Parts(rows.next().getData))
+          new CarbonRow(sortStepRowUtil.convertRow(rows.next().getData))
         rowCounter.add(1)
         row
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/78aa2cc3/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
deleted file mode 100644
index 8d351cf..0000000
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.loading.row;
-
-import java.nio.ByteBuffer;
-
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.util.DataTypeUtil;
-
-/**
- * During sort procedure, each row will be written to sort temp file in this 
logic format.
- * an intermediate sort temp row consists 3 parts:
- * dictSort, noDictSort, noSortDimsAndMeasures(dictNoSort, noDictNoSort, 
measure)
- */
-public class IntermediateSortTempRow {
-  private int[] dictSortDims;
-  private byte[][] noDictSortDims;
-  private byte[] noSortDimsAndMeasures;
-
-  public IntermediateSortTempRow(int[] dictSortDims, byte[][] noDictSortDims,
-      byte[] noSortDimsAndMeasures) {
-    this.dictSortDims = dictSortDims;
-    this.noDictSortDims = noDictSortDims;
-    this.noSortDimsAndMeasures = noSortDimsAndMeasures;
-  }
-
-  public int[] getDictSortDims() {
-    return dictSortDims;
-  }
-
-  public byte[][] getNoDictSortDims() {
-    return noDictSortDims;
-  }
-
-  public byte[] getNoSortDimsAndMeasures() {
-    return noSortDimsAndMeasures;
-  }
-
-  /**
-   * deserialize from bytes array to get the no sort fields
-   * @param outDictNoSort stores the dict & no-sort fields
-   * @param outNoDictNoSort stores the no-dict & no-sort fields, including 
complex
-   * @param outMeasures stores the measure fields
-   * @param dataTypes data type for the measure
-   */
-  public void unpackNoSortFromBytes(int[] outDictNoSort, byte[][] 
outNoDictNoSort,
-      Object[] outMeasures, DataType[] dataTypes) {
-    ByteBuffer rowBuffer = ByteBuffer.wrap(noSortDimsAndMeasures);
-    // read dict_no_sort
-    int dictNoSortCnt = outDictNoSort.length;
-    for (int i = 0; i < dictNoSortCnt; i++) {
-      outDictNoSort[i] = rowBuffer.getInt();
-    }
-
-    // read no_dict_no_sort (including complex)
-    int noDictNoSortCnt = outNoDictNoSort.length;
-    for (int i = 0; i < noDictNoSortCnt; i++) {
-      short len = rowBuffer.getShort();
-      byte[] bytes = new byte[len];
-      rowBuffer.get(bytes);
-      outNoDictNoSort[i] = bytes;
-    }
-
-    // read measure
-    int measureCnt = outMeasures.length;
-    DataType tmpDataType;
-    Object tmpContent;
-    for (short idx = 0 ; idx < measureCnt; idx++) {
-      if ((byte) 0 == rowBuffer.get()) {
-        outMeasures[idx] = null;
-        continue;
-      }
-
-      tmpDataType = dataTypes[idx];
-      if (DataTypes.BOOLEAN == tmpDataType) {
-        if ((byte) 1 == rowBuffer.get()) {
-          tmpContent = true;
-        } else {
-          tmpContent = false;
-        }
-      } else if (DataTypes.SHORT == tmpDataType) {
-        tmpContent = rowBuffer.getShort();
-      } else if (DataTypes.INT == tmpDataType) {
-        tmpContent = rowBuffer.getInt();
-      } else if (DataTypes.LONG == tmpDataType) {
-        tmpContent = rowBuffer.getLong();
-      } else if (DataTypes.DOUBLE == tmpDataType) {
-        tmpContent = rowBuffer.getDouble();
-      } else if (DataTypes.isDecimal(tmpDataType)) {
-        short len = rowBuffer.getShort();
-        byte[] decimalBytes = new byte[len];
-        rowBuffer.get(decimalBytes);
-        tmpContent = DataTypeUtil.byteToBigDecimal(decimalBytes);
-      } else {
-        throw new IllegalArgumentException("Unsupported data type: " + 
tmpDataType);
-      }
-      outMeasures[idx] = tmpContent;
-    }
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/78aa2cc3/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
deleted file mode 100644
index f31a2b9..0000000
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
+++ /dev/null
@@ -1,466 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.loading.sort;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-
-import org.apache.carbondata.core.memory.CarbonUnsafe;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.core.util.NonDictionaryUtil;
-import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
-import org.apache.carbondata.processing.sort.sortdata.SortParameters;
-import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
-
-/**
- * This class is used to convert/write/read row in sort step in carbondata.
- * It consists the following function:
- * 1. convert raw row & intermediate sort temp row to 3-parted row
- * 2. read/write intermediate sort temp row to sort temp file & unsafe memory
- * 3. write raw row directly to sort temp file & unsafe memory as intermediate 
sort temp row
- */
-public class SortStepRowHandler implements Serializable {
-  private static final long serialVersionUID = 1L;
-  private int dictSortDimCnt = 0;
-  private int dictNoSortDimCnt = 0;
-  private int noDictSortDimCnt = 0;
-  private int noDictNoSortDimCnt = 0;
-  private int measureCnt;
-
-  // indices for dict & sort dimension columns
-  private int[] dictSortDimIdx;
-  // indices for dict & no-sort dimension columns
-  private int[] dictNoSortDimIdx;
-  // indices for no-dict & sort dimension columns
-  private int[] noDictSortDimIdx;
-  // indices for no-dict & no-sort dimension columns, including complex columns
-  private int[] noDictNoSortDimIdx;
-  // indices for measure columns
-  private int[] measureIdx;
-
-  private DataType[] dataTypes;
-
-  /**
-   * constructor
-   * @param tableFieldStat table field stat
-   */
-  public SortStepRowHandler(TableFieldStat tableFieldStat) {
-    this.dictSortDimCnt = tableFieldStat.getDictSortDimCnt();
-    this.dictNoSortDimCnt = tableFieldStat.getDictNoSortDimCnt();
-    this.noDictSortDimCnt = tableFieldStat.getNoDictSortDimCnt();
-    this.noDictNoSortDimCnt = tableFieldStat.getNoDictNoSortDimCnt();
-    this.measureCnt = tableFieldStat.getMeasureCnt();
-    this.dictSortDimIdx = tableFieldStat.getDictSortDimIdx();
-    this.dictNoSortDimIdx = tableFieldStat.getDictNoSortDimIdx();
-    this.noDictSortDimIdx = tableFieldStat.getNoDictSortDimIdx();
-    this.noDictNoSortDimIdx = tableFieldStat.getNoDictNoSortDimIdx();
-    this.measureIdx = tableFieldStat.getMeasureIdx();
-    this.dataTypes = tableFieldStat.getMeasureDataType();
-  }
-
-  /**
-   * constructor
-   * @param sortParameters sort parameters
-   */
-  public SortStepRowHandler(SortParameters sortParameters) {
-    this(new TableFieldStat(sortParameters));
-  }
-
-  /**
-   * Convert carbon row from raw format to 3-parted format.
-   * This method is used in global-sort.
-   *
-   * @param row raw row whose length is the same as field number
-   * @return 3-parted row whose length is 3. (1 for dict dims ,1 for non-dict 
and complex,
-   * 1 for measures)
-   */
-  public Object[] convertRawRowTo3Parts(Object[] row) {
-    Object[] holder = new Object[3];
-    try {
-      int[] dictDims
-          = new int[this.dictSortDimCnt + this.dictNoSortDimCnt];
-      byte[][] nonDictArray = new byte[this.noDictSortDimCnt + 
this.noDictNoSortDimCnt][];
-      Object[] measures = new Object[this.measureCnt];
-
-      // convert dict & data
-      int idxAcc = 0;
-      for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
-        dictDims[idxAcc++] = (int) row[this.dictSortDimIdx[idx]];
-      }
-
-      // convert dict & no-sort
-      for (int idx = 0; idx < this.dictNoSortDimCnt; idx++) {
-        dictDims[idxAcc++] = (int) row[this.dictNoSortDimIdx[idx]];
-      }
-      // convert no-dict & sort
-      idxAcc = 0;
-      for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
-        nonDictArray[idxAcc++] = (byte[]) row[this.noDictSortDimIdx[idx]];
-      }
-      // convert no-dict & no-sort
-      for (int idx = 0; idx < this.noDictNoSortDimCnt; idx++) {
-        nonDictArray[idxAcc++] = (byte[]) row[this.noDictNoSortDimIdx[idx]];
-      }
-
-      // convert measure data
-      for (int idx = 0; idx < this.measureCnt; idx++) {
-        measures[idx] = row[this.measureIdx[idx]];
-      }
-
-      NonDictionaryUtil.prepareOutObj(holder, dictDims, nonDictArray, 
measures);
-    } catch (Exception e) {
-      throw new RuntimeException("Problem while converting row to 3 parts", e);
-    }
-    return holder;
-  }
-
-  /**
-   * Convert intermediate sort temp row to 3-parted row.
-   * This method is used in the final merge sort to feed rows to the next 
write step.
-   *
-   * @param sortTempRow intermediate sort temp row
-   * @return 3-parted row
-   */
-  public Object[] 
convertIntermediateSortTempRowTo3Parted(IntermediateSortTempRow sortTempRow) {
-    int[] dictDims
-        = new int[this.dictSortDimCnt + this.dictNoSortDimCnt];
-    byte[][] noDictArray
-        = new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt][];
-
-    int[] dictNoSortDims = new int[this.dictNoSortDimCnt];
-    byte[][] noDictNoSortDims = new byte[this.noDictNoSortDimCnt][];
-    Object[] measures = new Object[this.measureCnt];
-
-    sortTempRow.unpackNoSortFromBytes(dictNoSortDims, noDictNoSortDims, 
measures, this.dataTypes);
-
-    // dict dims
-    System.arraycopy(sortTempRow.getDictSortDims(), 0 , dictDims,
-        0, this.dictSortDimCnt);
-    System.arraycopy(dictNoSortDims, 0, dictDims,
-        this.dictSortDimCnt, this.dictNoSortDimCnt);;
-
-    // no dict dims, including complex
-    System.arraycopy(sortTempRow.getNoDictSortDims(), 0,
-        noDictArray, 0, this.noDictSortDimCnt);
-    System.arraycopy(noDictNoSortDims, 0, noDictArray,
-        this.noDictSortDimCnt, this.noDictNoSortDimCnt);
-
-    // measures are already here
-
-    Object[] holder = new Object[3];
-    NonDictionaryUtil.prepareOutObj(holder, dictDims, noDictArray, measures);
-    return holder;
-  }
-
-  /**
-   * Read intermediate sort temp row from InputStream.
-   * This method is used during the merge sort phase to read row from sort 
temp file.
-   *
-   * @param inputStream input stream
-   * @return a row that contains three parts
-   * @throws IOException if error occrus while reading from stream
-   */
-  public IntermediateSortTempRow readIntermediateSortTempRowFromInputStream(
-      DataInputStream inputStream) throws IOException {
-    int[] dictSortDims = new int[this.dictSortDimCnt];
-    byte[][] noDictSortDims = new byte[this.noDictSortDimCnt][];
-
-    // read dict & sort dim data
-    for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
-      dictSortDims[idx] = inputStream.readInt();
-    }
-
-    // read no-dict & sort data
-    for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
-      short len = inputStream.readShort();
-      byte[] bytes = new byte[len];
-      inputStream.readFully(bytes);
-      noDictSortDims[idx] = bytes;
-    }
-
-    // read no-dict dims & measures
-    int len = inputStream.readInt();
-    byte[] noSortDimsAndMeasures = new byte[len];
-    inputStream.readFully(noSortDimsAndMeasures);
-
-    return new IntermediateSortTempRow(dictSortDims, noDictSortDims, 
noSortDimsAndMeasures);
-  }
-
-  /**
-   * Write intermediate sort temp row to OutputStream
-   * This method is used during the merge sort phase to write row to sort temp 
file.
-   *
-   * @param sortTempRow intermediate sort temp row
-   * @param outputStream output stream
-   * @throws IOException if error occurs while writing to stream
-   */
-  public void 
writeIntermediateSortTempRowToOutputStream(IntermediateSortTempRow sortTempRow,
-      DataOutputStream outputStream) throws IOException {
-    // write dict & sort dim
-    for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
-      outputStream.writeInt(sortTempRow.getDictSortDims()[idx]);
-    }
-
-    // write no-dict & sort dim
-    for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
-      byte[] bytes = sortTempRow.getNoDictSortDims()[idx];
-      outputStream.writeShort(bytes.length);
-      outputStream.write(bytes);
-    }
-
-    // write packed no-sort dim & measure
-    outputStream.writeInt(sortTempRow.getNoSortDimsAndMeasures().length);
-    outputStream.write(sortTempRow.getNoSortDimsAndMeasures());
-  }
-
-  /**
-   * Write raw row as an intermediate sort temp row to sort temp file.
-   * This method is used in the beginning of the sort phase. Comparing with 
converting raw row to
-   * intermediate sort temp row and then writing the converted one, Writing 
raw row directly will
-   * save the intermediate trivial loss.
-   * This method use an array backend buffer to save memory allocation. The 
buffer will be reused
-   * for all rows (per thread).
-   *
-   * @param row raw row
-   * @param outputStream output stream
-   * @param rowBuffer array backend buffer
-   * @throws IOException if error occurs while writing to stream
-   */
-  public void writeRawRowAsIntermediateSortTempRowToOutputStream(Object[] row,
-      DataOutputStream outputStream, ByteBuffer rowBuffer) throws IOException {
-    // write dict & sort
-    for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
-      outputStream.writeInt((int) row[this.dictSortDimIdx[idx]]);
-    }
-
-    // write no-dict & sort
-    for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
-      byte[] bytes = (byte[]) row[this.noDictSortDimIdx[idx]];
-      outputStream.writeShort(bytes.length);
-      outputStream.write(bytes);
-    }
-
-    // pack no-sort
-    rowBuffer.clear();
-    packNoSortFieldsToBytes(row, rowBuffer);
-    rowBuffer.flip();
-    int packSize = rowBuffer.limit();
-
-    // write no-sort
-    outputStream.writeInt(packSize);
-    outputStream.write(rowBuffer.array(), 0, packSize);
-  }
-
-  /**
-   * Read intermediate sort temp row from unsafe memory.
-   * This method is used during merge sort phase for off-heap sort.
-   *
-   * @param baseObject base object of memory block
-   * @param address address of the row
-   * @return intermediate sort temp row
-   */
-  public IntermediateSortTempRow 
readIntermediateSortTempRowFromUnsafeMemory(Object baseObject,
-      long address) {
-    int size = 0;
-
-    int[] dictSortDims = new int[this.dictSortDimCnt];
-    byte[][] noDictSortDims = new byte[this.noDictSortDimCnt][];
-
-    // read dict & sort dim
-    for (int idx = 0; idx < dictSortDims.length; idx++) {
-      dictSortDims[idx] = CarbonUnsafe.getUnsafe().getInt(baseObject, address 
+ size);
-      size += 4;
-    }
-
-    // read no-dict & sort dim
-    for (int idx = 0; idx < noDictSortDims.length; idx++) {
-      short length = CarbonUnsafe.getUnsafe().getShort(baseObject, address + 
size);
-      size += 2;
-      byte[] bytes = new byte[length];
-      CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size,
-          bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
-      size += length;
-      noDictSortDims[idx] = bytes;
-    }
-
-    // read no-sort dims & measures
-    int len = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
-    size += 4;
-    byte[] noSortDimsAndMeasures = new byte[len];
-    CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size,
-        noSortDimsAndMeasures, CarbonUnsafe.BYTE_ARRAY_OFFSET, len);
-
-    return new IntermediateSortTempRow(dictSortDims, noDictSortDims, 
noSortDimsAndMeasures);
-  }
-
-  /**
-   * Write intermediate sort temp row directly from unsafe memory to stream.
-   * This method is used at the late beginning of the sort phase to write 
in-memory pages
-   * to sort temp file. Comparing with reading intermediate sort temp row from 
memory and then
-   * writing it, Writing directly from memory to stream will save the 
intermediate trivial loss.
-   *
-   * @param baseObject base object of the memory block
-   * @param address base address of the row
-   * @param outputStream output stream
-   * @throws IOException if error occurs while writing to stream
-   */
-  public void writeIntermediateSortTempRowFromUnsafeMemoryToStream(Object 
baseObject,
-      long address, DataOutputStream outputStream) throws IOException {
-    int size = 0;
-
-    // dict & sort
-    for (int idx = 0; idx < dictSortDimCnt; idx++) {
-      outputStream.writeInt(CarbonUnsafe.getUnsafe().getInt(baseObject, 
address + size));
-      size += 4;
-    }
-
-    // no-dict & sort
-    for (int idx = 0; idx < noDictSortDimCnt; idx++) {
-      short length = CarbonUnsafe.getUnsafe().getShort(baseObject, address + 
size);
-      size += 2;
-      byte[] bytes = new byte[length];
-      CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size,
-          bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
-      size += length;
-
-      outputStream.writeShort(length);
-      outputStream.write(bytes);
-    }
-
-    // packed no-sort & measure
-    int len = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
-    size += 4;
-    byte[] noSortDimsAndMeasures = new byte[len];
-    CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size,
-        noSortDimsAndMeasures, CarbonUnsafe.BYTE_ARRAY_OFFSET, len);
-    size += len;
-
-    outputStream.writeInt(len);
-    outputStream.write(noSortDimsAndMeasures);
-  }
-
-  /**
-   * Write raw row as an intermediate sort temp row to memory.
-   * This method is used in the beginning of the off-heap sort phase. 
Comparing with converting
-   * raw row to intermediate sort temp row and then writing the converted one,
-   * Writing raw row directly will save the intermediate trivial loss.
-   * This method use an array backend buffer to save memory allocation. The 
buffer will be reused
-   * for all rows (per thread).
-   *
-   * @param row raw row
-   * @param baseObject base object of the memory block
-   * @param address base address for the row
-   * @param rowBuffer array backend buffer
-   * @return number of bytes written to memory
-   */
-  public int writeRawRowAsIntermediateSortTempRowToUnsafeMemory(Object[] row,
-      Object baseObject, long address, ByteBuffer rowBuffer) {
-    int size = 0;
-    // write dict & sort
-    for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
-      CarbonUnsafe.getUnsafe()
-          .putInt(baseObject, address + size, (int) 
row[this.dictSortDimIdx[idx]]);
-      size += 4;
-    }
-
-    // write no-dict & sort
-    for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
-      byte[] bytes = (byte[]) row[this.noDictSortDimIdx[idx]];
-      CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) 
bytes.length);
-      size += 2;
-      CarbonUnsafe.getUnsafe()
-          .copyMemory(bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject, 
address + size,
-              bytes.length);
-      size += bytes.length;
-    }
-
-    // convert pack no-sort
-    rowBuffer.clear();
-    packNoSortFieldsToBytes(row, rowBuffer);
-    rowBuffer.flip();
-    int packSize = rowBuffer.limit();
-
-    // write no-sort
-    CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, packSize);
-    size += 4;
-    CarbonUnsafe.getUnsafe()
-        .copyMemory(rowBuffer.array(), CarbonUnsafe.BYTE_ARRAY_OFFSET, 
baseObject, address + size,
-            packSize);
-    size += packSize;
-    return size;
-  }
-
-  /**
-   * Pack to no-sort fields to byte array
-   *
-   * @param row raw row
-   * @param rowBuffer byte array backend buffer
-   */
-  private void packNoSortFieldsToBytes(Object[] row, ByteBuffer rowBuffer) {
-    // convert dict & no-sort
-    for (int idx = 0; idx < this.dictNoSortDimCnt; idx++) {
-      rowBuffer.putInt((int) row[this.dictNoSortDimIdx[idx]]);
-    }
-    // convert no-dict & no-sort
-    for (int idx = 0; idx < this.noDictNoSortDimCnt; idx++) {
-      byte[] bytes = (byte[]) row[this.noDictNoSortDimIdx[idx]];
-      rowBuffer.putShort((short) bytes.length);
-      rowBuffer.put(bytes);
-    }
-
-    // convert measure
-    Object tmpValue;
-    DataType tmpDataType;
-    for (int idx = 0; idx < this.measureCnt; idx++) {
-      tmpValue = row[this.measureIdx[idx]];
-      tmpDataType = this.dataTypes[idx];
-      if (null == tmpValue) {
-        rowBuffer.put((byte) 0);
-        continue;
-      }
-      rowBuffer.put((byte) 1);
-      if (DataTypes.BOOLEAN == tmpDataType) {
-        if ((boolean) tmpValue) {
-          rowBuffer.put((byte) 1);
-        } else {
-          rowBuffer.put((byte) 0);
-        }
-      } else if (DataTypes.SHORT == tmpDataType) {
-        rowBuffer.putShort((Short) tmpValue);
-      } else if (DataTypes.INT == tmpDataType) {
-        rowBuffer.putInt((Integer) tmpValue);
-      } else if (DataTypes.LONG == tmpDataType) {
-        rowBuffer.putLong((Long) tmpValue);
-      } else if (DataTypes.DOUBLE == tmpDataType) {
-        rowBuffer.putDouble((Double) tmpValue);
-      } else if (DataTypes.isDecimal(tmpDataType)) {
-        byte[] decimalBytes = DataTypeUtil.bigDecimalToByte((BigDecimal) 
tmpValue);
-        rowBuffer.putShort((short) decimalBytes.length);
-        rowBuffer.put(decimalBytes);
-      } else {
-        throw new IllegalArgumentException("Unsupported data type: " + 
tmpDataType);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/78aa2cc3/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java
new file mode 100644
index 0000000..c4e4756
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort;
+
+import org.apache.carbondata.core.util.NonDictionaryUtil;
+import org.apache.carbondata.processing.sort.sortdata.SortParameters;
+
+public class SortStepRowUtil {
+  private int measureCount;
+  private int dimensionCount;
+  private int complexDimensionCount;
+  private int noDictionaryCount;
+  private int[] dictDimIdx;
+  private int[] nonDictIdx;
+  private int[] measureIdx;
+
+  public SortStepRowUtil(SortParameters parameters) {
+    this.measureCount = parameters.getMeasureColCount();
+    this.dimensionCount = parameters.getDimColCount();
+    this.complexDimensionCount = parameters.getComplexDimColCount();
+    this.noDictionaryCount = parameters.getNoDictionaryCount();
+    boolean[] isNoDictionaryDimensionColumn = 
parameters.getNoDictionaryDimnesionColumn();
+
+    int index = 0;
+    int nonDicIndex = 0;
+    int allCount = 0;
+
+    // be careful that the default value is 0
+    this.dictDimIdx = new int[dimensionCount - noDictionaryCount];
+    this.nonDictIdx = new int[noDictionaryCount + complexDimensionCount];
+    this.measureIdx = new int[measureCount];
+
+    // indices for dict dim columns
+    for (int i = 0; i < isNoDictionaryDimensionColumn.length; i++) {
+      if (isNoDictionaryDimensionColumn[i]) {
+        nonDictIdx[nonDicIndex++] = i;
+      } else {
+        dictDimIdx[index++] = allCount;
+      }
+      allCount++;
+    }
+
+    // indices for non dict dim/complex columns
+    for (int i = 0; i < complexDimensionCount; i++) {
+      nonDictIdx[nonDicIndex++] = allCount;
+      allCount++;
+    }
+
+    // indices for measure columns
+    for (int i = 0; i < measureCount; i++) {
+      measureIdx[i] = allCount;
+      allCount++;
+    }
+  }
+
+  public Object[] convertRow(Object[] data) {
+    // create new row of size 3 (1 for dims , 1 for high card , 1 for measures)
+    Object[] holder = new Object[3];
+    try {
+
+      int[] dictDims = new int[dimensionCount - noDictionaryCount];
+      byte[][] nonDictArray = new byte[noDictionaryCount + 
complexDimensionCount][];
+      Object[] measures = new Object[measureCount];
+
+      // write dict dim data
+      for (int idx = 0; idx < dictDimIdx.length; idx++) {
+        dictDims[idx] = (int) data[dictDimIdx[idx]];
+      }
+
+      // write non dict dim data
+      for (int idx = 0; idx < nonDictIdx.length; idx++) {
+        nonDictArray[idx] = (byte[]) data[nonDictIdx[idx]];
+      }
+
+      // write measure data
+      for (int idx = 0; idx < measureIdx.length; idx++) {
+        measures[idx] = data[measureIdx[idx]];
+      }
+      NonDictionaryUtil.prepareOutObj(holder, dictDims, nonDictArray, 
measures);
+
+      // increment number if record read
+    } catch (Exception e) {
+      throw new RuntimeException("Problem while converting row ", e);
+    }
+    //return out row
+    return holder;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/78aa2cc3/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
index 7ea5cb3..e5583c2 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
@@ -19,20 +19,35 @@ package 
org.apache.carbondata.processing.loading.sort.unsafe;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.nio.ByteBuffer;
+import java.math.BigDecimal;
+import java.util.Arrays;
 
+import org.apache.carbondata.core.memory.CarbonUnsafe;
 import org.apache.carbondata.core.memory.IntPointerBuffer;
 import org.apache.carbondata.core.memory.MemoryBlock;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
 import org.apache.carbondata.core.memory.UnsafeSortMemoryManager;
-import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
-import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
-import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.util.DataTypeUtil;
 
 /**
  * It can keep the data of prescribed size data in offheap/onheap memory and 
returns it when needed
  */
 public class UnsafeCarbonRowPage {
+
+  private boolean[] noDictionaryDimensionMapping;
+
+  private boolean[] noDictionarySortColumnMapping;
+
+  private int dimensionSize;
+
+  private int measureSize;
+
+  private DataType[] measureDataType;
+
+  private long[] nullSetWords;
+
   private IntPointerBuffer buffer;
 
   private int lastSize;
@@ -47,14 +62,16 @@ public class UnsafeCarbonRowPage {
 
   private long taskId;
 
-  private TableFieldStat tableFieldStat;
-  private SortStepRowHandler sortStepRowHandler;
-
-  public UnsafeCarbonRowPage(TableFieldStat tableFieldStat, MemoryBlock 
memoryBlock,
-      boolean saveToDisk, long taskId) {
-    this.tableFieldStat = tableFieldStat;
-    this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
+  public UnsafeCarbonRowPage(boolean[] noDictionaryDimensionMapping,
+      boolean[] noDictionarySortColumnMapping, int dimensionSize, int 
measureSize, DataType[] type,
+      MemoryBlock memoryBlock, boolean saveToDisk, long taskId) {
+    this.noDictionaryDimensionMapping = noDictionaryDimensionMapping;
+    this.noDictionarySortColumnMapping = noDictionarySortColumnMapping;
+    this.dimensionSize = dimensionSize;
+    this.measureSize = measureSize;
+    this.measureDataType = type;
     this.saveToDisk = saveToDisk;
+    this.nullSetWords = new long[((measureSize - 1) >> 6) + 1];
     this.taskId = taskId;
     buffer = new IntPointerBuffer(this.taskId);
     this.dataBlock = memoryBlock;
@@ -63,44 +80,255 @@ public class UnsafeCarbonRowPage {
     this.managerType = MemoryManagerType.UNSAFE_MEMORY_MANAGER;
   }
 
-  public int addRow(Object[] row, ByteBuffer rowBuffer) {
-    int size = addRow(row, dataBlock.getBaseOffset() + lastSize, rowBuffer);
+  public int addRow(Object[] row) {
+    int size = addRow(row, dataBlock.getBaseOffset() + lastSize);
     buffer.set(lastSize);
     lastSize = lastSize + size;
     return size;
   }
 
-  /**
-   * add raw row as intermidiate sort temp row to page
-   *
-   * @param row
-   * @param address
-   * @return
-   */
-  private int addRow(Object[] row, long address, ByteBuffer rowBuffer) {
-    return 
sortStepRowHandler.writeRawRowAsIntermediateSortTempRowToUnsafeMemory(row,
-        dataBlock.getBaseObject(), address, rowBuffer);
+  private int addRow(Object[] row, long address) {
+    if (row == null) {
+      throw new RuntimeException("Row is null ??");
+    }
+    int dimCount = 0;
+    int size = 0;
+    Object baseObject = dataBlock.getBaseObject();
+    for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) {
+      if (noDictionaryDimensionMapping[dimCount]) {
+        byte[] col = (byte[]) row[dimCount];
+        CarbonUnsafe.getUnsafe()
+            .putShort(baseObject, address + size, (short) col.length);
+        size += 2;
+        CarbonUnsafe.getUnsafe().copyMemory(col, 
CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
+            address + size, col.length);
+        size += col.length;
+      } else {
+        int value = (int) row[dimCount];
+        CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, value);
+        size += 4;
+      }
+    }
+
+    // write complex dimensions here.
+    for (; dimCount < dimensionSize; dimCount++) {
+      byte[] col = (byte[]) row[dimCount];
+      CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) 
col.length);
+      size += 2;
+      CarbonUnsafe.getUnsafe().copyMemory(col, CarbonUnsafe.BYTE_ARRAY_OFFSET, 
baseObject,
+          address + size, col.length);
+      size += col.length;
+    }
+    Arrays.fill(nullSetWords, 0);
+    int nullSetSize = nullSetWords.length * 8;
+    int nullWordLoc = size;
+    size += nullSetSize;
+    for (int mesCount = 0; mesCount < measureSize; mesCount++) {
+      Object value = row[mesCount + dimensionSize];
+      if (null != value) {
+        DataType dataType = measureDataType[mesCount];
+        if (dataType == DataTypes.BOOLEAN) {
+          Boolean bval = (Boolean) value;
+          CarbonUnsafe.getUnsafe().putBoolean(baseObject, address + size, 
bval);
+          size += 1;
+        } else if (dataType == DataTypes.SHORT) {
+          Short sval = (Short) value;
+          CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, sval);
+          size += 2;
+        } else if (dataType == DataTypes.INT) {
+          Integer ival = (Integer) value;
+          CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, ival);
+          size += 4;
+        } else if (dataType == DataTypes.LONG) {
+          Long val = (Long) value;
+          CarbonUnsafe.getUnsafe().putLong(baseObject, address + size, val);
+          size += 8;
+        } else if (dataType == DataTypes.DOUBLE) {
+          Double doubleVal = (Double) value;
+          CarbonUnsafe.getUnsafe().putDouble(baseObject, address + size, 
doubleVal);
+          size += 8;
+        } else if (DataTypes.isDecimal(dataType)) {
+          BigDecimal decimalVal = (BigDecimal) value;
+          byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(decimalVal);
+          CarbonUnsafe.getUnsafe()
+              .putShort(baseObject, address + size, (short) 
bigDecimalInBytes.length);
+          size += 2;
+          CarbonUnsafe.getUnsafe()
+              .copyMemory(bigDecimalInBytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, 
baseObject,
+                  address + size, bigDecimalInBytes.length);
+          size += bigDecimalInBytes.length;
+        } else {
+          throw new IllegalArgumentException("unsupported data type:" + 
measureDataType[mesCount]);
+        }
+        set(nullSetWords, mesCount);
+      } else {
+        unset(nullSetWords, mesCount);
+      }
+    }
+    CarbonUnsafe.getUnsafe().copyMemory(nullSetWords, 
CarbonUnsafe.LONG_ARRAY_OFFSET, baseObject,
+        address + nullWordLoc, nullSetSize);
+    return size;
   }
 
-  /**
-   * get one row from memory address
-   * @param address address
-   * @return one row
-   */
-  public IntermediateSortTempRow getRow(long address) {
-    return sortStepRowHandler.readIntermediateSortTempRowFromUnsafeMemory(
-        dataBlock.getBaseObject(), address);
+  public Object[] getRow(long address, Object[] rowToFill) {
+    int dimCount = 0;
+    int size = 0;
+
+    Object baseObject = dataBlock.getBaseObject();
+    for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) {
+      if (noDictionaryDimensionMapping[dimCount]) {
+        short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + 
size);
+        byte[] col = new byte[aShort];
+        size += 2;
+        CarbonUnsafe.getUnsafe()
+            .copyMemory(baseObject, address + size, col, 
CarbonUnsafe.BYTE_ARRAY_OFFSET,
+                col.length);
+        size += col.length;
+        rowToFill[dimCount] = col;
+      } else {
+        int anInt = CarbonUnsafe.getUnsafe().getInt(baseObject, address + 
size);
+        size += 4;
+        rowToFill[dimCount] = anInt;
+      }
+    }
+
+    // write complex dimensions here.
+    for (; dimCount < dimensionSize; dimCount++) {
+      short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + 
size);
+      byte[] col = new byte[aShort];
+      size += 2;
+      CarbonUnsafe.getUnsafe()
+          .copyMemory(baseObject, address + size, col, 
CarbonUnsafe.BYTE_ARRAY_OFFSET, col.length);
+      size += col.length;
+      rowToFill[dimCount] = col;
+    }
+
+    int nullSetSize = nullSetWords.length * 8;
+    Arrays.fill(nullSetWords, 0);
+    CarbonUnsafe.getUnsafe()
+        .copyMemory(baseObject, address + size, nullSetWords, 
CarbonUnsafe.LONG_ARRAY_OFFSET,
+            nullSetSize);
+    size += nullSetSize;
+
+    for (int mesCount = 0; mesCount < measureSize; mesCount++) {
+      if (isSet(nullSetWords, mesCount)) {
+        DataType dataType = measureDataType[mesCount];
+        if (dataType == DataTypes.BOOLEAN) {
+          Boolean bval = CarbonUnsafe.getUnsafe().getBoolean(baseObject, 
address + size);
+          size += 1;
+          rowToFill[dimensionSize + mesCount] = bval;
+        } else if (dataType == DataTypes.SHORT) {
+          Short sval = CarbonUnsafe.getUnsafe().getShort(baseObject, address + 
size);
+          size += 2;
+          rowToFill[dimensionSize + mesCount] = sval;
+        } else if (dataType == DataTypes.INT) {
+          Integer ival = CarbonUnsafe.getUnsafe().getInt(baseObject, address + 
size);
+          size += 4;
+          rowToFill[dimensionSize + mesCount] = ival;
+        } else if (dataType == DataTypes.LONG) {
+          Long val = CarbonUnsafe.getUnsafe().getLong(baseObject, address + 
size);
+          size += 8;
+          rowToFill[dimensionSize + mesCount] = val;
+        } else if (dataType == DataTypes.DOUBLE) {
+          Double doubleVal = CarbonUnsafe.getUnsafe().getDouble(baseObject, 
address + size);
+          size += 8;
+          rowToFill[dimensionSize + mesCount] = doubleVal;
+        } else if (DataTypes.isDecimal(dataType)) {
+          short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address 
+ size);
+          byte[] bigDecimalInBytes = new byte[aShort];
+          size += 2;
+          CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size, 
bigDecimalInBytes,
+              CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
+          size += bigDecimalInBytes.length;
+          rowToFill[dimensionSize + mesCount] = 
DataTypeUtil.byteToBigDecimal(bigDecimalInBytes);
+        } else {
+          throw new IllegalArgumentException("unsupported data type:" + 
measureDataType[mesCount]);
+        }
+      } else {
+        rowToFill[dimensionSize + mesCount] = null;
+      }
+    }
+    return rowToFill;
   }
 
-  /**
-   * write a row to stream
-   * @param address address of a row
-   * @param stream stream
-   * @throws IOException
-   */
-  public void writeRow(long address, DataOutputStream stream) throws 
IOException {
-    sortStepRowHandler.writeIntermediateSortTempRowFromUnsafeMemoryToStream(
-        dataBlock.getBaseObject(), address, stream);
+  public void fillRow(long address, DataOutputStream stream) throws 
IOException {
+    int dimCount = 0;
+    int size = 0;
+
+    Object baseObject = dataBlock.getBaseObject();
+    for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) {
+      if (noDictionaryDimensionMapping[dimCount]) {
+        short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + 
size);
+        byte[] col = new byte[aShort];
+        size += 2;
+        CarbonUnsafe.getUnsafe()
+            .copyMemory(baseObject, address + size, col, 
CarbonUnsafe.BYTE_ARRAY_OFFSET,
+                col.length);
+        size += col.length;
+        stream.writeShort(aShort);
+        stream.write(col);
+      } else {
+        int anInt = CarbonUnsafe.getUnsafe().getInt(baseObject, address + 
size);
+        size += 4;
+        stream.writeInt(anInt);
+      }
+    }
+
+    // write complex dimensions here.
+    for (; dimCount < dimensionSize; dimCount++) {
+      short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + 
size);
+      byte[] col = new byte[aShort];
+      size += 2;
+      CarbonUnsafe.getUnsafe()
+          .copyMemory(baseObject, address + size, col, 
CarbonUnsafe.BYTE_ARRAY_OFFSET, col.length);
+      size += col.length;
+      stream.writeShort(aShort);
+      stream.write(col);
+    }
+
+    int nullSetSize = nullSetWords.length * 8;
+    Arrays.fill(nullSetWords, 0);
+    CarbonUnsafe.getUnsafe()
+        .copyMemory(baseObject, address + size, nullSetWords, 
CarbonUnsafe.LONG_ARRAY_OFFSET,
+            nullSetSize);
+    size += nullSetSize;
+    for (int i = 0; i < nullSetWords.length; i++) {
+      stream.writeLong(nullSetWords[i]);
+    }
+
+    for (int mesCount = 0; mesCount < measureSize; mesCount++) {
+      if (isSet(nullSetWords, mesCount)) {
+        DataType dataType = measureDataType[mesCount];
+        if (dataType == DataTypes.SHORT) {
+          short sval = CarbonUnsafe.getUnsafe().getShort(baseObject, address + 
size);
+          size += 2;
+          stream.writeShort(sval);
+        } else if (dataType == DataTypes.INT) {
+          int ival = CarbonUnsafe.getUnsafe().getInt(baseObject, address + 
size);
+          size += 4;
+          stream.writeInt(ival);
+        } else if (dataType == DataTypes.LONG) {
+          long val = CarbonUnsafe.getUnsafe().getLong(baseObject, address + 
size);
+          size += 8;
+          stream.writeLong(val);
+        } else if (dataType == DataTypes.DOUBLE) {
+          double doubleVal = CarbonUnsafe.getUnsafe().getDouble(baseObject, 
address + size);
+          size += 8;
+          stream.writeDouble(doubleVal);
+        } else if (DataTypes.isDecimal(dataType)) {
+          short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address 
+ size);
+          byte[] bigDecimalInBytes = new byte[aShort];
+          size += 2;
+          CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size, 
bigDecimalInBytes,
+              CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
+          size += bigDecimalInBytes.length;
+          stream.writeShort(aShort);
+          stream.write(bigDecimalInBytes);
+        } else {
+          throw new IllegalArgumentException("unsupported data type:" + 
measureDataType[mesCount]);
+        }
+      }
+    }
   }
 
   public void freeMemory() {
@@ -134,8 +362,27 @@ public class UnsafeCarbonRowPage {
     return dataBlock;
   }
 
-  public TableFieldStat getTableFieldStat() {
-    return tableFieldStat;
+  public static void set(long[] words, int index) {
+    int wordOffset = (index >> 6);
+    words[wordOffset] |= (1L << index);
+  }
+
+  public static void unset(long[] words, int index) {
+    int wordOffset = (index >> 6);
+    words[wordOffset] &= ~(1L << index);
+  }
+
+  public static boolean isSet(long[] words, int index) {
+    int wordOffset = (index >> 6);
+    return ((words[wordOffset] & (1L << index)) != 0);
+  }
+
+  public boolean[] getNoDictionaryDimensionMapping() {
+    return noDictionaryDimensionMapping;
+  }
+
+  public boolean[] getNoDictionarySortColumnMapping() {
+    return noDictionarySortColumnMapping;
   }
 
   public void setNewDataBlock(MemoryBlock newMemoryBlock) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/78aa2cc3/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
index 5d038d3..4dd5e44 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
@@ -20,7 +20,6 @@ package org.apache.carbondata.processing.loading.sort.unsafe;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -42,14 +41,13 @@ import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
 import 
org.apache.carbondata.processing.loading.sort.unsafe.comparator.UnsafeRowComparator;
-import 
org.apache.carbondata.processing.loading.sort.unsafe.comparator.UnsafeRowComparatorForNormalDims;
+import 
org.apache.carbondata.processing.loading.sort.unsafe.comparator.UnsafeRowComparatorForNormalDIms;
 import 
org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRow;
 import 
org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeIntermediateMerger;
 import org.apache.carbondata.processing.loading.sort.unsafe.sort.TimSort;
 import 
org.apache.carbondata.processing.loading.sort.unsafe.sort.UnsafeIntSortDataFormat;
 import 
org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
 import org.apache.carbondata.processing.sort.sortdata.SortParameters;
-import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
 public class UnsafeSortDataRows {
@@ -71,8 +69,7 @@ public class UnsafeSortDataRows {
    */
 
   private SortParameters parameters;
-  private TableFieldStat tableFieldStat;
-  private ThreadLocal<ByteBuffer> rowBuffer;
+
   private UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger;
 
   private UnsafeCarbonRowPage rowPage;
@@ -97,13 +94,7 @@ public class UnsafeSortDataRows {
   public UnsafeSortDataRows(SortParameters parameters,
       UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger, int 
inMemoryChunkSize) {
     this.parameters = parameters;
-    this.tableFieldStat = new TableFieldStat(parameters);
-    this.rowBuffer = new ThreadLocal<ByteBuffer>() {
-      @Override protected ByteBuffer initialValue() {
-        byte[] backedArray = new byte[2 * 1024 * 1024];
-        return ByteBuffer.wrap(backedArray);
-      }
-    };
+
     this.unsafeInMemoryIntermediateFileMerger = 
unsafeInMemoryIntermediateFileMerger;
 
     // observer of writing file in thread
@@ -136,7 +127,11 @@ public class UnsafeSortDataRows {
     if (isMemoryAvailable) {
       UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(baseBlock.size());
     }
-    this.rowPage = new UnsafeCarbonRowPage(tableFieldStat, baseBlock, 
!isMemoryAvailable, taskId);
+    this.rowPage = new 
UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(),
+        parameters.getNoDictionarySortColumn(),
+        parameters.getDimColCount() + parameters.getComplexDimColCount(),
+        parameters.getMeasureColCount(), parameters.getMeasureDataType(), 
baseBlock,
+        !isMemoryAvailable, taskId);
     // Delete if any older file exists in sort temp folder
     deleteSortLocationIfExists();
 
@@ -183,7 +178,7 @@ public class UnsafeSortDataRows {
   private void addBatch(Object[][] rowBatch, int size) throws 
CarbonSortKeyAndGroupByException {
     for (int i = 0; i < size; i++) {
       if (rowPage.canAdd()) {
-        bytesAdded += rowPage.addRow(rowBatch[i], rowBuffer.get());
+        bytesAdded += rowPage.addRow(rowBatch[i]);
       } else {
         try {
           if (enableInMemoryIntermediateMerge) {
@@ -199,8 +194,15 @@ public class UnsafeSortDataRows {
           if (!saveToDisk) {
             
UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(memoryBlock.size());
           }
-          rowPage = new UnsafeCarbonRowPage(tableFieldStat, memoryBlock, 
saveToDisk, taskId);
-          bytesAdded += rowPage.addRow(rowBatch[i], rowBuffer.get());
+          rowPage = new UnsafeCarbonRowPage(
+                  parameters.getNoDictionaryDimnesionColumn(),
+                  parameters.getNoDictionarySortColumn(),
+                  parameters.getDimColCount() + 
parameters.getComplexDimColCount(),
+                  parameters.getMeasureColCount(),
+                  parameters.getMeasureDataType(),
+                  memoryBlock,
+                  saveToDisk, taskId);
+          bytesAdded += rowPage.addRow(rowBatch[i]);
         } catch (Exception e) {
           LOGGER.error(
                   "exception occurred while trying to acquire a semaphore 
lock: " + e.getMessage());
@@ -218,7 +220,7 @@ public class UnsafeSortDataRows {
     // if record holder list size is equal to sort buffer size then it will
     // sort the list and then write current list data to file
     if (rowPage.canAdd()) {
-      rowPage.addRow(row, rowBuffer.get());
+      rowPage.addRow(row);
     } else {
       try {
         if (enableInMemoryIntermediateMerge) {
@@ -233,8 +235,13 @@ public class UnsafeSortDataRows {
         if (!saveToDisk) {
           
UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(memoryBlock.size());
         }
-        rowPage = new UnsafeCarbonRowPage(tableFieldStat, memoryBlock, 
saveToDisk, taskId);
-        rowPage.addRow(row, rowBuffer.get());
+        rowPage = new UnsafeCarbonRowPage(
+            parameters.getNoDictionaryDimnesionColumn(),
+            parameters.getNoDictionarySortColumn(),
+            parameters.getDimColCount(), parameters.getMeasureColCount(),
+            parameters.getMeasureDataType(), memoryBlock,
+            saveToDisk, taskId);
+        rowPage.addRow(row);
       } catch (Exception e) {
         LOGGER.error(
             "exception occurred while trying to acquire a semaphore lock: " + 
e.getMessage());
@@ -262,7 +269,7 @@ public class UnsafeSortDataRows {
             new UnsafeRowComparator(rowPage));
       } else {
         timSort.sort(rowPage.getBuffer(), 0, 
rowPage.getBuffer().getActualSize(),
-            new UnsafeRowComparatorForNormalDims(rowPage));
+            new UnsafeRowComparatorForNormalDIms(rowPage));
       }
       unsafeInMemoryIntermediateFileMerger.addDataChunkToMerge(rowPage);
     } else {
@@ -288,9 +295,10 @@ public class UnsafeSortDataRows {
       // write number of entries to the file
       stream.writeInt(actualSize);
       for (int i = 0; i < actualSize; i++) {
-        rowPage.writeRow(
-            rowPage.getBuffer().get(i) + 
rowPage.getDataBlock().getBaseOffset(), stream);
+        rowPage.fillRow(rowPage.getBuffer().get(i) + 
rowPage.getDataBlock().getBaseOffset(),
+            stream);
       }
+
     } catch (IOException e) {
       throw new CarbonSortKeyAndGroupByException("Problem while writing the 
file", e);
     } finally {
@@ -359,7 +367,7 @@ public class UnsafeSortDataRows {
               new UnsafeRowComparator(page));
         } else {
           timSort.sort(page.getBuffer(), 0, page.getBuffer().getActualSize(),
-              new UnsafeRowComparatorForNormalDims(page));
+              new UnsafeRowComparatorForNormalDIms(page));
         }
         if (page.isSaveToDisk()) {
           // create a new file every time
@@ -372,8 +380,7 @@ public class UnsafeSortDataRows {
           writeDataToFile(page, sortTempFile);
           LOGGER.info("Time taken to sort row page with size" + 
page.getBuffer().getActualSize()
               + " and write is: " + (System.currentTimeMillis() - startTime) + 
": location:"
-              + sortTempFile + ", sort temp file size in MB is "
-              + sortTempFile.length() * 0.1 * 10 / 1024 / 1024);
+              + sortTempFile);
           page.freeMemory();
           // add sort temp filename to and arrayList. When the list size 
reaches 20 then
           // intermediate merging of sort temp files will be triggered

http://git-wip-us.apache.org/repos/asf/carbondata/blob/78aa2cc3/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
index 33342dc..d02be9b 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
@@ -23,25 +23,63 @@ import org.apache.carbondata.core.memory.CarbonUnsafe;
 import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
 import 
org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
 import 
org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRow;
-import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
 
 public class UnsafeRowComparator implements Comparator<UnsafeCarbonRow> {
+
+  /**
+   * mapping of dictionary and no dictionary of sort_columns.
+   */
+  private boolean[] noDictionarySortColumnMaping;
+
   private Object baseObject;
-  private TableFieldStat tableFieldStat;
-  private int dictSizeInMemory;
 
   public UnsafeRowComparator(UnsafeCarbonRowPage rowPage) {
+    this.noDictionarySortColumnMaping = 
rowPage.getNoDictionarySortColumnMapping();
     this.baseObject = rowPage.getDataBlock().getBaseObject();
-    this.tableFieldStat = rowPage.getTableFieldStat();
-    this.dictSizeInMemory = (tableFieldStat.getDictSortDimCnt()
-        + tableFieldStat.getDictNoSortDimCnt()) * 4;
   }
 
   /**
    * Below method will be used to compare two mdkey
    */
   public int compare(UnsafeCarbonRow rowL, UnsafeCarbonRow rowR) {
-    return compare(rowL, baseObject, rowR, baseObject);
+    int diff = 0;
+    long rowA = rowL.address;
+    long rowB = rowR.address;
+    int sizeA = 0;
+    int sizeB = 0;
+    for (boolean isNoDictionary : noDictionarySortColumnMaping) {
+      if (isNoDictionary) {
+        short aShort1 = CarbonUnsafe.getUnsafe().getShort(baseObject, rowA + 
sizeA);
+        byte[] byteArr1 = new byte[aShort1];
+        sizeA += 2;
+        CarbonUnsafe.getUnsafe().copyMemory(baseObject, rowA + sizeA, byteArr1,
+            CarbonUnsafe.BYTE_ARRAY_OFFSET, aShort1);
+        sizeA += aShort1;
+
+        short aShort2 = CarbonUnsafe.getUnsafe().getShort(baseObject, rowB + 
sizeB);
+        byte[] byteArr2 = new byte[aShort2];
+        sizeB += 2;
+        CarbonUnsafe.getUnsafe().copyMemory(baseObject, rowB + sizeB, byteArr2,
+            CarbonUnsafe.BYTE_ARRAY_OFFSET, aShort2);
+        sizeB += aShort2;
+
+        int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
+        if (difference != 0) {
+          return difference;
+        }
+      } else {
+        int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObject, rowA + 
sizeA);
+        sizeA += 4;
+        int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObject, rowB + 
sizeB);
+        sizeB += 4;
+        diff = dimFieldA - dimFieldB;
+        if (diff != 0) {
+          return diff;
+        }
+      }
+    }
+
+    return diff;
   }
 
   /**
@@ -52,40 +90,35 @@ public class UnsafeRowComparator implements 
Comparator<UnsafeCarbonRow> {
     int diff = 0;
     long rowA = rowL.address;
     long rowB = rowR.address;
-    int sizeInDictPartA = 0;
-
-    int sizeInNonDictPartA = 0;
-    int sizeInDictPartB = 0;
-    int sizeInNonDictPartB = 0;
-    for (boolean isNoDictionary : tableFieldStat.getIsSortColNoDictFlags()) {
+    int sizeA = 0;
+    int sizeB = 0;
+    for (boolean isNoDictionary : noDictionarySortColumnMaping) {
       if (isNoDictionary) {
-        short lengthA = CarbonUnsafe.getUnsafe().getShort(baseObjectL,
-            rowA + dictSizeInMemory + sizeInNonDictPartA);
-        byte[] byteArr1 = new byte[lengthA];
-        sizeInNonDictPartA += 2;
+        short aShort1 = CarbonUnsafe.getUnsafe().getShort(baseObjectL, rowA + 
sizeA);
+        byte[] byteArr1 = new byte[aShort1];
+        sizeA += 2;
         CarbonUnsafe.getUnsafe()
-            .copyMemory(baseObjectL, rowA + dictSizeInMemory + 
sizeInNonDictPartA,
-                byteArr1, CarbonUnsafe.BYTE_ARRAY_OFFSET, lengthA);
-        sizeInNonDictPartA += lengthA;
+            .copyMemory(baseObjectL, rowA + sizeA, byteArr1, 
CarbonUnsafe.BYTE_ARRAY_OFFSET,
+                aShort1);
+        sizeA += aShort1;
 
-        short lengthB = CarbonUnsafe.getUnsafe().getShort(baseObjectR,
-            rowB + dictSizeInMemory + sizeInNonDictPartB);
-        byte[] byteArr2 = new byte[lengthB];
-        sizeInNonDictPartB += 2;
+        short aShort2 = CarbonUnsafe.getUnsafe().getShort(baseObjectR, rowB + 
sizeB);
+        byte[] byteArr2 = new byte[aShort2];
+        sizeB += 2;
         CarbonUnsafe.getUnsafe()
-            .copyMemory(baseObjectR, rowB + dictSizeInMemory + 
sizeInNonDictPartB,
-                byteArr2, CarbonUnsafe.BYTE_ARRAY_OFFSET, lengthB);
-        sizeInNonDictPartB += lengthB;
+            .copyMemory(baseObjectR, rowB + sizeB, byteArr2, 
CarbonUnsafe.BYTE_ARRAY_OFFSET,
+                aShort2);
+        sizeB += aShort2;
 
         int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
         if (difference != 0) {
           return difference;
         }
       } else {
-        int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObjectL, rowA + 
sizeInDictPartA);
-        sizeInDictPartA += 4;
-        int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObjectR, rowB + 
sizeInDictPartB);
-        sizeInDictPartB += 4;
+        int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObjectL, rowA + 
sizeA);
+        sizeA += 4;
+        int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObjectR, rowB + 
sizeB);
+        sizeB += 4;
         diff = dimFieldA - dimFieldB;
         if (diff != 0) {
           return diff;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/78aa2cc3/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
new file mode 100644
index 0000000..483dcb2
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort.unsafe.comparator;
+
+import java.util.Comparator;
+
+import org.apache.carbondata.core.memory.CarbonUnsafe;
+import 
org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
+import 
org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRow;
+
+public class UnsafeRowComparatorForNormalDIms implements 
Comparator<UnsafeCarbonRow> {
+
+  private Object baseObject;
+
+  private int numberOfSortColumns;
+
+  public UnsafeRowComparatorForNormalDIms(UnsafeCarbonRowPage rowPage) {
+    this.baseObject = rowPage.getDataBlock().getBaseObject();
+    this.numberOfSortColumns = 
rowPage.getNoDictionarySortColumnMapping().length;
+  }
+
+  /**
+   * Below method will be used to compare two mdkey
+   */
+  public int compare(UnsafeCarbonRow rowL, UnsafeCarbonRow rowR) {
+    int diff = 0;
+    long rowA = rowL.address;
+    long rowB = rowR.address;
+    int sizeA = 0;
+    int sizeB = 0;
+    for (int i = 0; i < numberOfSortColumns; i++) {
+      int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObject, rowA + 
sizeA);
+      sizeA += 4;
+      int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObject, rowB + 
sizeB);
+      sizeB += 4;
+      diff = dimFieldA - dimFieldB;
+      if (diff != 0) {
+        return diff;
+      }
+    }
+
+    return diff;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/78aa2cc3/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDims.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDims.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDims.java
deleted file mode 100644
index e9cfb1c..0000000
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDims.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.loading.sort.unsafe.comparator;
-
-import java.util.Comparator;
-
-import org.apache.carbondata.core.memory.CarbonUnsafe;
-import 
org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
-import 
org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRow;
-
-public class UnsafeRowComparatorForNormalDims implements 
Comparator<UnsafeCarbonRow> {
-
-  private Object baseObject;
-
-  private int numberOfSortColumns;
-
-  public UnsafeRowComparatorForNormalDims(UnsafeCarbonRowPage rowPage) {
-    this.baseObject = rowPage.getDataBlock().getBaseObject();
-    this.numberOfSortColumns = 
rowPage.getTableFieldStat().getIsSortColNoDictFlags().length;
-  }
-
-  /**
-   * Below method will be used to compare two mdkey
-   */
-  public int compare(UnsafeCarbonRow rowL, UnsafeCarbonRow rowR) {
-    int diff = 0;
-    long rowA = rowL.address;
-    long rowB = rowR.address;
-    int sizeA = 0;
-    int sizeB = 0;
-    for (int i = 0; i < numberOfSortColumns; i++) {
-      int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObject, rowA + 
sizeA);
-      sizeA += 4;
-      int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObject, rowB + 
sizeB);
-      sizeB += 4;
-      diff = dimFieldA - dimFieldB;
-      if (diff != 0) {
-        return diff;
-      }
-    }
-
-    return diff;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/78aa2cc3/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java
index d790c41..686e855 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java
@@ -17,7 +17,6 @@
 
 package org.apache.carbondata.processing.loading.sort.unsafe.holder;
 
-import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
 import 
org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
 
 /**
@@ -29,7 +28,7 @@ public interface SortTempChunkHolder extends 
Comparable<SortTempChunkHolder> {
 
   void readRow()  throws CarbonSortKeyAndGroupByException;
 
-  IntermediateSortTempRow getRow();
+  Object[] getRow();
 
   int numberOfRows();
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/78aa2cc3/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
index a776db1..6b0cfa6 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
@@ -19,10 +19,9 @@ package 
org.apache.carbondata.processing.loading.sort.unsafe.holder;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
 import 
org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
 import 
org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeInMemoryIntermediateDataMerger;
-import 
org.apache.carbondata.processing.sort.sortdata.IntermediateSortTempRowComparator;
+import org.apache.carbondata.processing.sort.sortdata.NewRowComparator;
 
 public class UnsafeFinalMergePageHolder implements SortTempChunkHolder {
 
@@ -39,18 +38,21 @@ public class UnsafeFinalMergePageHolder implements 
SortTempChunkHolder {
 
   private UnsafeCarbonRowPage[] rowPages;
 
-  private IntermediateSortTempRowComparator comparator;
+  private NewRowComparator comparator;
 
-  private IntermediateSortTempRow currentRow;
+  private Object[] currentRow;
+
+  private int columnSize;
 
   public UnsafeFinalMergePageHolder(UnsafeInMemoryIntermediateDataMerger 
merger,
-      boolean[] noDictSortColumnMapping) {
+      boolean[] noDictSortColumnMapping, int columnSize) {
     this.actualSize = merger.getEntryCount();
     this.mergedAddresses = merger.getMergedAddresses();
     this.rowPageIndexes = merger.getRowPageIndexes();
     this.rowPages = merger.getUnsafeCarbonRowPages();
     LOGGER.audit("Processing unsafe inmemory rows page with size : " + 
actualSize);
-    this.comparator = new 
IntermediateSortTempRowComparator(noDictSortColumnMapping);
+    this.comparator = new NewRowComparator(noDictSortColumnMapping);
+    this.columnSize = columnSize;
   }
 
   public boolean hasNext() {
@@ -61,11 +63,12 @@ public class UnsafeFinalMergePageHolder implements 
SortTempChunkHolder {
   }
 
   public void readRow() {
-    currentRow = 
rowPages[rowPageIndexes[counter]].getRow(mergedAddresses[counter]);
+    currentRow = new Object[columnSize];
+    rowPages[rowPageIndexes[counter]].getRow(mergedAddresses[counter], 
currentRow);
     counter++;
   }
 
-  public IntermediateSortTempRow getRow() {
+  public Object[] getRow() {
     return currentRow;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/78aa2cc3/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
index cbcbbae..6f05088 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
@@ -19,9 +19,8 @@ package 
org.apache.carbondata.processing.loading.sort.unsafe.holder;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
 import 
org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
-import 
org.apache.carbondata.processing.sort.sortdata.IntermediateSortTempRowComparator;
+import org.apache.carbondata.processing.sort.sortdata.NewRowComparator;
 
 public class UnsafeInmemoryHolder implements SortTempChunkHolder {
 
@@ -34,18 +33,21 @@ public class UnsafeInmemoryHolder implements 
SortTempChunkHolder {
 
   private UnsafeCarbonRowPage rowPage;
 
-  private IntermediateSortTempRow currentRow;
+  private Object[] currentRow;
 
   private long address;
 
-  private IntermediateSortTempRowComparator comparator;
+  private NewRowComparator comparator;
 
-  public UnsafeInmemoryHolder(UnsafeCarbonRowPage rowPage) {
+  private int columnSize;
+
+  public UnsafeInmemoryHolder(UnsafeCarbonRowPage rowPage, int columnSize,
+      int numberOfSortColumns) {
     this.actualSize = rowPage.getBuffer().getActualSize();
     this.rowPage = rowPage;
     LOGGER.audit("Processing unsafe inmemory rows page with size : " + 
actualSize);
-    this.comparator = new IntermediateSortTempRowComparator(
-        rowPage.getTableFieldStat().getIsSortColNoDictFlags());
+    this.comparator = new 
NewRowComparator(rowPage.getNoDictionarySortColumnMapping());
+    this.columnSize = columnSize;
   }
 
   public boolean hasNext() {
@@ -56,12 +58,13 @@ public class UnsafeInmemoryHolder implements 
SortTempChunkHolder {
   }
 
   public void readRow() {
+    currentRow = new Object[columnSize];
     address = rowPage.getBuffer().get(counter);
-    currentRow = rowPage.getRow(address + 
rowPage.getDataBlock().getBaseOffset());
+    rowPage.getRow(address + rowPage.getDataBlock().getBaseOffset(), 
currentRow);
     counter++;
   }
 
-  public IntermediateSortTempRow getRow() {
+  public Object[] getRow() {
     return currentRow;
   }
 

Reply via email to