This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 20179f8  [CARBONDATA-3728] Fix insert failure on partition table with 
local sort
20179f8 is described below

commit 20179f8ed0d6feb63b49aebee6f5a91f21c756e2
Author: ajantha-bhat <ajanthab...@gmail.com>
AuthorDate: Sat Feb 29 19:45:22 2020 +0530

    [CARBONDATA-3728] Fix insert failure on partition table with local sort
    
    Why is this PR needed?
    
    In the new Insert flow, partition column data is maintained at the end till 
convert to 3 steps of the write step.
    
    But when local sort happens before the write step, The mapping is derived 
based on original internal order instead of partition internal order. Hence 
insert fails during sorting.
    
    What changes were proposed in this PR?
    
    Use internal partition order instead of internal order.
    
    Support 1.1 compatibility
    
    avoid impact for sort step of load flow partition.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #3645
---
 .../carbondata/core/datastore/TableSpec.java       |  33 ++++++-
 .../command/management/CommonLoadUtils.scala       |   8 +-
 .../datasources/SparkCarbonTableFormat.scala       |   3 +-
 .../StandardPartitionTableLoadingTestCase.scala    |  45 +++++++++
 .../loading/CarbonDataLoadConfiguration.java       |  17 ++++
 .../processing/loading/DataLoadProcessBuilder.java |   4 +-
 .../constants/DataLoadProcessorConstants.java      |   2 +
 .../CarbonRowDataWriterProcessorStepImpl.java      |   7 +-
 .../processing/sort/sortdata/SortParameters.java   |  70 ++++++++++----
 .../processing/sort/sortdata/TableFieldStat.java   | 107 ++++++++++++++++-----
 .../store/CarbonFactDataHandlerModel.java          |   2 +-
 .../processing/util/CarbonDataProcessorUtil.java   | 105 ++++++++++++++++++++
 12 files changed, 351 insertions(+), 52 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java 
b/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
index 631429e..ae6507c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
@@ -30,6 +30,7 @@ import 
org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.Writable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 
 public class TableSpec {
 
@@ -56,10 +57,40 @@ public class TableSpec {
   private int[] dictDimActualPosition;
   private int[] noDictDimActualPosition;
 
-  public TableSpec(CarbonTable carbonTable) {
+  public TableSpec(CarbonTable carbonTable, boolean keepPartitionColumnsToEnd) 
{
     this.carbonTable = carbonTable;
     List<CarbonDimension> dimensions = carbonTable.getVisibleDimensions();
     List<CarbonMeasure> measures = carbonTable.getVisibleMeasures();
+    if (keepPartitionColumnsToEnd && carbonTable.getPartitionInfo() != null) {
+      // keep the partition columns in the end
+      List<CarbonDimension> reArrangedDimensions = new ArrayList<>();
+      List<CarbonMeasure> reArrangedMeasures = new ArrayList<>();
+      List<CarbonDimension> partitionDimensions = new ArrayList<>();
+      List<CarbonMeasure> partitionMeasures = new ArrayList<>();
+      List<ColumnSchema> columnSchemaList = 
carbonTable.getPartitionInfo().getColumnSchemaList();
+      for (CarbonDimension dim : dimensions) {
+        if (columnSchemaList.contains(dim.getColumnSchema())) {
+          partitionDimensions.add(dim);
+        } else {
+          reArrangedDimensions.add(dim);
+        }
+      }
+      if (partitionDimensions.size() != 0) {
+        reArrangedDimensions.addAll(partitionDimensions);
+      }
+      for (CarbonMeasure measure : measures) {
+        if (columnSchemaList.contains(measure.getColumnSchema())) {
+          partitionMeasures.add(measure);
+        } else {
+          reArrangedMeasures.add(measure);
+        }
+      }
+      if (partitionMeasures.size() != 0) {
+        reArrangedMeasures.addAll(partitionMeasures);
+      }
+      dimensions = reArrangedDimensions;
+      measures = reArrangedMeasures;
+    }
     // first calculate total number of columnar field considering column group 
and complex column
     numSimpleDimensions = 0;
     for (CarbonDimension dimension : dimensions) {
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index f76a3db..a0b2676 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -61,6 +61,7 @@ import org.apache.carbondata.core.util._
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{BuildDataMapPostExecutionEvent, 
BuildDataMapPreExecutionEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.indexserver.DistributedRDDUtils
+import 
org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
 import 
org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent,
 LoadTablePreExecutionEvent}
 import org.apache.carbondata.processing.loading.model.{CarbonLoadModelBuilder, 
LoadOption}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
@@ -609,7 +610,8 @@ object CommonLoadUtils {
       optionsOriginal: mutable.Map[String, String],
       currPartitions: util.List[PartitionSpec]): LogicalRelation = {
     val table = loadModel.getCarbonDataLoadSchema.getCarbonTable
-    val metastoreSchema = if 
(optionsOriginal.contains("no_rearrange_of_rows")) {
+    val metastoreSchema =
+      if 
(optionsOriginal.contains(DataLoadProcessorConstants.NO_REARRANGE_OF_ROWS)) {
       StructType(catalogTable.schema.fields.map{f =>
         val column = table.getColumnByName(f.name)
         val updatedDataType = if (column.getDataType ==
@@ -694,7 +696,7 @@ object CommonLoadUtils {
       fileFormat = new SparkCarbonTableFormat,
       options = options.toMap)(sparkSession = sparkSession)
 
-    if (options.contains("no_rearrange_of_rows")) {
+    if (options.contains(DataLoadProcessorConstants.NO_REARRANGE_OF_ROWS)) {
       CarbonReflectionUtils.getLogicalRelation(hdfsRelation,
         metastoreSchema.toAttributes,
         Some(catalogTable),
@@ -980,7 +982,7 @@ object CommonLoadUtils {
       }
       val opt = collection.mutable.Map() ++ loadParams.optionsOriginal
       if (loadParams.scanResultRDD.isDefined) {
-        opt += (("no_rearrange_of_rows", "true"))
+        opt += ((DataLoadProcessorConstants.NO_REARRANGE_OF_ROWS, "true"))
       }
       // Create and ddd the segment to the tablestatus.
       
CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadParams.carbonLoadModel,
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
index d93b079..da46177 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
@@ -51,6 +51,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, 
CarbonTableOutputFormat}
 import 
org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter
 import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
+import 
org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
 import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, 
CarbonLoadModelBuilder, LoadOption}
 import org.apache.carbondata.processing.util.CarbonBadRecordUtil
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil}
@@ -134,7 +135,7 @@ with Serializable {
       model,
       conf)
     CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean)
-    if (options.contains("no_rearrange_of_rows")) {
+    if (options.contains(DataLoadProcessorConstants.NO_REARRANGE_OF_ROWS)) {
       model.setLoadWithoutConverterWithoutReArrangeStep(true)
     } else {
       model.setLoadWithoutConverterStep(true)
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index d54b07b..4df678b 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -502,6 +502,51 @@ class StandardPartitionTableLoadingTestCase extends 
QueryTest with BeforeAndAfte
     assert(result.get(0).get(7).equals(dataAndIndexSize._2))
   }
 
+  test("test partition with all sort scope") {
+    sql("drop table if exists origin_csv")
+    sql(
+      s"""
+         | create table origin_csv(col1 int, col2 string, col3 date)
+         | using csv
+         | options('dateFormat'='yyyy-MM-dd', 'timestampFormat'='yyyy-MM-dd 
HH:mm:ss')
+         | """.stripMargin)
+    sql("insert into origin_csv select 1, '3aa', to_date('2019-11-11')")
+    sql("insert into origin_csv select 2, '2bb', to_date('2019-11-12')")
+    sql("insert into origin_csv select 3, '1cc', to_date('2019-11-13')")
+    verifyInsertForPartitionTable("tbl_p_ns", "no_sort")
+    verifyInsertForPartitionTable("tbl_p_ls", "local_sort")
+    verifyInsertForPartitionTable("tbl_p_gs", "global_sort")
+    sql("drop table origin_csv")
+  }
+
+  def verifyInsertForPartitionTable(tableName: String, sort_scope: String): 
Unit = {
+    sql(s"drop table if exists $tableName")
+    sql(
+      s"""
+         | create table $tableName (
+         | col1 int,
+         | col2 string,
+         | col3 date,
+         | col4 timestamp,
+         | col5 float
+         | )
+         | using carbondata
+         | options('dateFormat'='yyyy-MM-dd', 'timestampFormat'='yyyy-MM-dd 
HH:mm:ss',
+         | 'sort_scope'='${ sort_scope }', 'sort_columns'='col2')
+         | partitioned by(col3, col4)
+     """.stripMargin)
+    sql(
+      s"""
+         | insert into $tableName (
+         |  select col1, col2, 1.2, col3, to_timestamp('2019-02-02 13:01:01') 
from origin_csv
+         |  union all
+         |  select 123,'abc', 1.2, to_date('2019-01-01'), 
to_timestamp('2019-02-02 13:01:01'))
+         |  """.stripMargin
+    )
+    checkAnswer(sql(s"select count(*) from $tableName"), Seq(Row(4)))
+    sql(s"drop table $tableName")
+  }
+
   def getDataAndIndexSize(path: String): (String, String) = {
     val mergeIndexFiles = FileFactory.getCarbonFile(path).listFiles(new 
CarbonFileFilter {
       override def accept(file: CarbonFile): Boolean = {
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
index 1af4fe3..7d0d87a 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
@@ -250,6 +250,23 @@ public class CarbonDataLoadConfiguration {
     return type;
   }
 
+  public DataType[] getMeasureDataTypeAsDataFieldOrder() {
+    // same as data fields order
+    List<Integer> measureIndexes = new ArrayList<>(dataFields.length);
+    int measureCount = 0;
+    for (int i = 0; i < dataFields.length; i++) {
+      if (!dataFields[i].getColumn().isDimension()) {
+        measureIndexes.add(i);
+        measureCount++;
+      }
+    }
+    DataType[] type = new DataType[measureCount];
+    for (int i = 0; i < type.length; i++) {
+      type[i] = dataFields[measureIndexes.get(i)].getColumn().getDataType();
+    }
+    return type;
+  }
+
   /**
    * Get the data types of the no dictionary and the complex dimensions of the 
table
    *
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 8586a61..75af485 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -238,7 +238,7 @@ public final class DataLoadProcessBuilder {
     
configuration.setDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER,
         loadModel.getBinaryDecoder());
     if (loadModel.isLoadWithoutConverterWithoutReArrangeStep()) {
-      configuration.setDataLoadProperty("no_rearrange_of_rows",
+      
configuration.setDataLoadProperty(DataLoadProcessorConstants.NO_REARRANGE_OF_ROWS,
           loadModel.isLoadWithoutConverterWithoutReArrangeStep());
     }
     List<CarbonDimension> dimensions = carbonTable.getVisibleDimensions();
@@ -267,7 +267,7 @@ public final class DataLoadProcessBuilder {
     if (carbonTable.isHivePartitionTable()) {
       configuration.setWritingCoresCount((short) 1);
     }
-    TableSpec tableSpec = new TableSpec(carbonTable);
+    TableSpec tableSpec = new TableSpec(carbonTable, false);
     configuration.setTableSpec(tableSpec);
     if (loadModel.getSdkWriterCores() > 0) {
       configuration.setWritingCoresCount(loadModel.getSdkWriterCores());
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/constants/DataLoadProcessorConstants.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/constants/DataLoadProcessorConstants.java
index dbd9048..c7ef81b 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/constants/DataLoadProcessorConstants.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/constants/DataLoadProcessorConstants.java
@@ -38,4 +38,6 @@ public final class DataLoadProcessorConstants {
 
   public static final String FACT_FILE_PATH = "FACT_FILE_PATH";
 
+  // to indicate that it is optimized insert flow without rearrange of each 
data rows
+  public static final String NO_REARRANGE_OF_ROWS = "NO_REARRANGE_OF_ROWS";
 }
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
index 0f5b203..635b3b4 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
@@ -44,6 +44,7 @@ import 
org.apache.carbondata.processing.datamap.DataMapWriterListener;
 import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
 import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.loading.DataField;
+import 
org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
 import 
org.apache.carbondata.processing.loading.exception.BadRecordFoundException;
 import 
org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
@@ -137,7 +138,8 @@ public class CarbonRowDataWriterProcessorStepImpl extends 
AbstractDataLoadProces
       CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
           
.recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PARTITION_ID,
               System.currentTimeMillis());
-      if (configuration.getDataLoadProperty("no_rearrange_of_rows") != null) {
+      if (configuration.getDataLoadProperty(
+          DataLoadProcessorConstants.NO_REARRANGE_OF_ROWS) != null) {
         initializeNoReArrangeIndexes();
       }
       if (iterators.length == 1) {
@@ -363,7 +365,8 @@ public class CarbonRowDataWriterProcessorStepImpl extends 
AbstractDataLoadProces
   private void processBatch(CarbonRowBatch batch, CarbonFactHandler 
dataHandler, int iteratorIndex)
       throws CarbonDataLoadingException {
     try {
-      if (configuration.getDataLoadProperty("no_rearrange_of_rows") != null) {
+      if (configuration.getDataLoadProperty(
+          DataLoadProcessorConstants.NO_REARRANGE_OF_ROWS) != null) {
         // convert without re-arrange
         while (batch.hasNext()) {
           CarbonRow row = batch.next();
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
 
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
index cb95226..9d41854 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
@@ -30,6 +30,7 @@ import 
org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import 
org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
 import org.apache.commons.lang3.StringUtils;
@@ -158,6 +159,8 @@ public class SortParameters implements Serializable {
    */
   private int[] noDictSortColumnSchemaOrderMapping;
 
+  private boolean isInsertWithoutReArrangeFlow;
+
   public SortParameters getCopy() {
     SortParameters parameters = new SortParameters();
     parameters.tempFileLocation = tempFileLocation;
@@ -196,6 +199,7 @@ public class SortParameters implements Serializable {
     parameters.dictDimActualPosition = dictDimActualPosition;
     parameters.noDictActualPosition = noDictActualPosition;
     parameters.noDictSortColumnSchemaOrderMapping = 
noDictSortColumnSchemaOrderMapping;
+    parameters.isInsertWithoutReArrangeFlow = isInsertWithoutReArrangeFlow;
     return parameters;
   }
 
@@ -407,6 +411,14 @@ public class SortParameters implements Serializable {
     this.noDictSortColumnSchemaOrderMapping = 
noDictSortColumnSchemaOrderMapping;
   }
 
+  public boolean isInsertWithoutReArrangeFlow() {
+    return isInsertWithoutReArrangeFlow;
+  }
+
+  public void setInsertWithoutReArrangeFlow(boolean 
insertWithoutReArrangeFlow) {
+    isInsertWithoutReArrangeFlow = insertWithoutReArrangeFlow;
+  }
+
   public static SortParameters 
createSortParameters(CarbonDataLoadConfiguration configuration) {
     SortParameters parameters = new SortParameters();
     CarbonTableIdentifier tableIdentifier =
@@ -431,10 +443,6 @@ public class SortParameters implements Serializable {
         
CarbonDataProcessorUtil.getIsVarcharColumnMapping(configuration.getDataFields()));
     parameters.setNumberOfSortColumns(configuration.getNumberOfSortColumns());
     
parameters.setNumberOfNoDictSortColumns(configuration.getNumberOfNoDictSortColumns());
-    parameters.setNoDictionarySortColumn(CarbonDataProcessorUtil
-        .getNoDictSortColMapping(parameters.getCarbonTable()));
-    parameters.setNoDictSortColumnSchemaOrderMapping(CarbonDataProcessorUtil
-        .getColumnIdxBasedOnSchemaInRow(parameters.getCarbonTable()));
     parameters.setSortColumn(configuration.getSortColumnMapping());
     parameters.setObserver(new SortObserver());
     // get sort buffer size
@@ -482,18 +490,46 @@ public class SortParameters implements Serializable {
         CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
         CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT)));
 
-    DataType[] measureDataType = configuration.getMeasureDataType();
-    parameters.setMeasureDataType(measureDataType);
-    parameters.setNoDictDataType(CarbonDataProcessorUtil
-        .getNoDictDataTypes(configuration.getTableSpec().getCarbonTable()));
-    Map<String, DataType[]> noDictSortAndNoSortDataTypes = 
CarbonDataProcessorUtil
-        
.getNoDictSortAndNoSortDataTypes(configuration.getTableSpec().getCarbonTable());
-    
parameters.setNoDictSortDataType(noDictSortAndNoSortDataTypes.get("noDictSortDataTypes"));
-    
parameters.setNoDictNoSortDataType(noDictSortAndNoSortDataTypes.get("noDictNoSortDataTypes"));
-    
parameters.setNoDictActualPosition(configuration.getTableSpec().getNoDictDimActualPosition());
-    
parameters.setDictDimActualPosition(configuration.getTableSpec().getDictDimActualPosition());
-    
parameters.setUpdateDictDims(configuration.getTableSpec().isUpdateDictDim());
-    
parameters.setUpdateNonDictDims(configuration.getTableSpec().isUpdateNoDictDims());
+    if 
(configuration.getDataLoadProperty(DataLoadProcessorConstants.NO_REARRANGE_OF_ROWS)
 != null
+        && configuration.getTableSpec().getCarbonTable().getPartitionInfo() != 
null) {
+      // In case of partition, partition data will be present in the end for 
rearrange flow
+      // So, prepare the indexes and mapping as per dataFields order.
+      parameters.setInsertWithoutReArrangeFlow(true);
+      parameters.setNoDictionarySortColumn(CarbonDataProcessorUtil
+          
.getNoDictSortColMappingAsDataFieldOrder(configuration.getDataFields()));
+      parameters.setNoDictSortColumnSchemaOrderMapping(CarbonDataProcessorUtil
+          
.getColumnIdxBasedOnSchemaInRowAsDataFieldOrder(configuration.getDataFields()));
+      
parameters.setMeasureDataType(configuration.getMeasureDataTypeAsDataFieldOrder());
+      parameters.setNoDictDataType(CarbonDataProcessorUtil
+          .getNoDictDataTypesAsDataFieldOrder(configuration.getDataFields()));
+      Map<String, DataType[]> noDictSortAndNoSortDataTypes = 
CarbonDataProcessorUtil
+          
.getNoDictSortAndNoSortDataTypesAsDataFieldOrder(configuration.getDataFields());
+      
parameters.setNoDictSortDataType(noDictSortAndNoSortDataTypes.get("noDictSortDataTypes"));
+      
parameters.setNoDictNoSortDataType(noDictSortAndNoSortDataTypes.get("noDictNoSortDataTypes"));
+      // keep partition columns in the end for table spec by getting 
rearranged tale spec
+      TableSpec tableSpec = new 
TableSpec(configuration.getTableSpec().getCarbonTable(), true);
+      
parameters.setNoDictActualPosition(tableSpec.getNoDictDimActualPosition());
+      
parameters.setDictDimActualPosition(tableSpec.getDictDimActualPosition());
+      parameters.setUpdateDictDims(tableSpec.isUpdateDictDim());
+      parameters.setUpdateNonDictDims(tableSpec.isUpdateNoDictDims());
+    } else {
+      parameters.setNoDictionarySortColumn(CarbonDataProcessorUtil
+          .getNoDictSortColMapping(parameters.getCarbonTable()));
+      parameters.setNoDictSortColumnSchemaOrderMapping(CarbonDataProcessorUtil
+          .getColumnIdxBasedOnSchemaInRow(parameters.getCarbonTable()));
+      parameters.setMeasureDataType(configuration.getMeasureDataType());
+      parameters.setNoDictDataType(CarbonDataProcessorUtil
+          .getNoDictDataTypes(configuration.getTableSpec().getCarbonTable()));
+      Map<String, DataType[]> noDictSortAndNoSortDataTypes = 
CarbonDataProcessorUtil
+          
.getNoDictSortAndNoSortDataTypes(configuration.getTableSpec().getCarbonTable());
+      
parameters.setNoDictSortDataType(noDictSortAndNoSortDataTypes.get("noDictSortDataTypes"));
+      
parameters.setNoDictNoSortDataType(noDictSortAndNoSortDataTypes.get("noDictNoSortDataTypes"));
+      TableSpec tableSpec = configuration.getTableSpec();
+      
parameters.setNoDictActualPosition(tableSpec.getNoDictDimActualPosition());
+      
parameters.setDictDimActualPosition(tableSpec.getDictDimActualPosition());
+      parameters.setUpdateDictDims(tableSpec.isUpdateDictDim());
+      parameters.setUpdateNonDictDims(tableSpec.isUpdateNoDictDims());
+    }
     return parameters;
   }
 
@@ -579,7 +615,7 @@ public class SortParameters implements Serializable {
         .getNoDictSortColMapping(parameters.getCarbonTable()));
     parameters.setNoDictSortColumnSchemaOrderMapping(CarbonDataProcessorUtil
         .getColumnIdxBasedOnSchemaInRow(parameters.getCarbonTable()));
-    TableSpec tableSpec = new TableSpec(carbonTable);
+    TableSpec tableSpec = new TableSpec(carbonTable, false);
     parameters.setNoDictActualPosition(tableSpec.getNoDictDimActualPosition());
     parameters.setDictDimActualPosition(tableSpec.getDictDimActualPosition());
     parameters.setUpdateDictDims(tableSpec.isUpdateDictDim());
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
 
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
index 9f540a1..3ffc591 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
@@ -24,7 +24,10 @@ import java.util.Objects;
 
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.processing.sort.DummyRowUpdater;
 import org.apache.carbondata.processing.sort.SchemaBasedRowUpdater;
 import org.apache.carbondata.processing.sort.SortTempRowUpdater;
@@ -120,39 +123,68 @@ public class TableFieldStat implements Serializable {
     int tmpDictNoSortCnt = 0;
     int tmpVarcharCnt = 0;
     int tmpComplexcount = 0;
-
-    List<CarbonDimension> allDimensions = 
sortParameters.getCarbonTable().getVisibleDimensions();
-    List<CarbonDimension> updatedDimensions = 
updateDimensionsBasedOnSortColumns(allDimensions);
-    for (int i = 0; i < updatedDimensions.size(); i++) {
-      CarbonDimension carbonDimension = updatedDimensions.get(i);
-      if (carbonDimension.getDataType() == DataTypes.DATE && 
!carbonDimension.isComplex()) {
-        if (carbonDimension.isSortColumn()) {
-          dictSortDimIdx[tmpDictSortCnt++] = i;
+    int tmpMeasureIndex = 0;
+
+    if (sortParameters.isInsertWithoutReArrangeFlow()
+        && sortParameters.getCarbonTable().getPartitionInfo() != null) {
+      List<ColumnSchema> reArrangedColumnSchema =
+          getReArrangedColumnSchema(sortParameters.getCarbonTable());
+      for (int i = 0; i < reArrangedColumnSchema.size(); i++) {
+        ColumnSchema columnSchema = reArrangedColumnSchema.get(i);
+        if (columnSchema.isDimensionColumn()) {
+          if (columnSchema.getDataType() == DataTypes.DATE && 
!columnSchema.getDataType()
+              .isComplexType()) {
+            if (columnSchema.isSortColumn()) {
+              dictSortDimIdx[tmpDictSortCnt++] = i;
+            } else {
+              dictNoSortDimIdx[tmpDictNoSortCnt++] = i;
+            }
+          } else if (!columnSchema.getDataType().isComplexType()) {
+            if (columnSchema.getDataType() == DataTypes.VARCHAR) {
+              varcharDimIdx[tmpVarcharCnt++] = i;
+            } else if (columnSchema.isSortColumn()) {
+              noDictSortDimIdx[tmpNoDictSortCnt++] = i;
+            } else {
+              noDictNoSortDimIdx[tmpNoDictNoSortCnt++] = i;
+            }
+          } else {
+            complexDimIdx[tmpComplexcount++] = i;
+          }
         } else {
-          dictNoSortDimIdx[tmpDictNoSortCnt++] = i;
+          measureIdx[tmpMeasureIndex++] = i;
         }
-      } else if (!carbonDimension.isComplex()) {
-        if (isVarcharDimFlags[i]) {
-          varcharDimIdx[tmpVarcharCnt++] = i;
-        } else if (carbonDimension.isSortColumn()) {
-          noDictSortDimIdx[tmpNoDictSortCnt++] = i;
+      }
+    } else {
+      List<CarbonDimension> allDimensions = 
sortParameters.getCarbonTable().getVisibleDimensions();
+      List<CarbonDimension> updatedDimensions = 
updateDimensionsBasedOnSortColumns(allDimensions);
+      for (int i = 0; i < updatedDimensions.size(); i++) {
+        CarbonDimension carbonDimension = updatedDimensions.get(i);
+        if (carbonDimension.getDataType() == DataTypes.DATE && 
!carbonDimension.isComplex()) {
+          if (carbonDimension.isSortColumn()) {
+            dictSortDimIdx[tmpDictSortCnt++] = i;
+          } else {
+            dictNoSortDimIdx[tmpDictNoSortCnt++] = i;
+          }
+        } else if (!carbonDimension.isComplex()) {
+          if (isVarcharDimFlags[i]) {
+            varcharDimIdx[tmpVarcharCnt++] = i;
+          } else if (carbonDimension.isSortColumn()) {
+            noDictSortDimIdx[tmpNoDictSortCnt++] = i;
+          } else {
+            noDictNoSortDimIdx[tmpNoDictNoSortCnt++] = i;
+          }
         } else {
-          noDictNoSortDimIdx[tmpNoDictNoSortCnt++] = i;
+          complexDimIdx[tmpComplexcount++] = i;
         }
-      } else {
-        complexDimIdx[tmpComplexcount++] = i;
+      }
+      int base = updatedDimensions.size();
+      // indices for measure columns
+      for (int i = 0; i < measureCnt; i++) {
+        measureIdx[i] = base + i;
       }
     }
-
     dictNoSortDimCnt = tmpDictNoSortCnt;
     noDictNoSortDimCnt = tmpNoDictNoSortCnt;
-
-    int base = updatedDimensions.size();
-
-    // indices for measure columns
-    for (int i = 0; i < measureCnt; i++) {
-      measureIdx[i] = base + i;
-    }
     if (sortParameters.isUpdateDictDims() || 
sortParameters.isUpdateNonDictDims()) {
       this.sortTempRowUpdater = new 
SchemaBasedRowUpdater(sortParameters.getDictDimActualPosition(),
           sortParameters.getNoDictActualPosition(), 
sortParameters.isUpdateDictDims(),
@@ -295,4 +327,29 @@ public class TableFieldStat implements Serializable {
     updatedDataFields.addAll(nonSortFields);
     return updatedDataFields;
   }
+
+  private static List<ColumnSchema> getReArrangedColumnSchema(
+      CarbonTable carbonTable) {
+    // handle 1.1 compatibility for sort columns
+    List<CarbonDimension> visibleDimensions =
+        updateDimensionsBasedOnSortColumns(carbonTable.getVisibleDimensions());
+    List<CarbonMeasure> visibleMeasures = carbonTable.getVisibleMeasures();
+    List<ColumnSchema> otherCols = new ArrayList<>();
+    if (carbonTable.getPartitionInfo() != null) {
+      List<ColumnSchema> columnSchemaList = 
carbonTable.getPartitionInfo().getColumnSchemaList();
+      for (CarbonDimension dim : visibleDimensions) {
+        if (!columnSchemaList.contains(dim.getColumnSchema())) {
+          otherCols.add(dim.getColumnSchema());
+        }
+      }
+      for (CarbonMeasure measure : visibleMeasures) {
+        if (!columnSchemaList.contains(measure.getColumnSchema())) {
+          otherCols.add(measure.getColumnSchema());
+        }
+      }
+      otherCols.addAll(columnSchemaList);
+    }
+    return otherCols;
+  }
+
 }
\ No newline at end of file
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index a95bf15..5ab3061 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -304,7 +304,7 @@ public class CarbonFactDataHandlerModel {
     
carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB());
     
carbonFactDataHandlerModel.setColumnCompressor(loadModel.getColumnCompressor());
 
-    carbonFactDataHandlerModel.tableSpec = new TableSpec(carbonTable);
+    carbonFactDataHandlerModel.tableSpec = new TableSpec(carbonTable, false);
     DataMapWriterListener listener = new DataMapWriterListener();
     listener.registerAllWriter(
         carbonTable,
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index c159d18..a3fbb0d 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -376,6 +376,25 @@ public final class CarbonDataProcessorUtil {
   }
 
   /**
+   * get visible no dictionary dimensions as per data field order
+   *
+   * @param dataFields
+   * @return
+   */
+  public static DataType[] getNoDictDataTypesAsDataFieldOrder(DataField[] 
dataFields) {
+    List<DataType> type = new ArrayList<>();
+    for (DataField dataField : dataFields) {
+      if (!dataField.getColumn().isInvisible() && 
dataField.getColumn().isDimension()) {
+        if (dataField.getColumn().getColumnSchema().isSortColumn()
+            && dataField.getColumn().getColumnSchema().getDataType() != 
DataTypes.DATE) {
+          type.add(dataField.getColumn().getColumnSchema().getDataType());
+        }
+      }
+    }
+    return type.toArray(new DataType[type.size()]);
+  }
+
+  /**
    * Get the no dictionary sort column mapping of the table
    *
    * @param carbonTable
@@ -402,6 +421,33 @@ public final class CarbonDataProcessorUtil {
   }
 
   /**
+   * get mapping based on data fields order
+   *
+   * @param dataFields
+   * @return
+   */
+  public static boolean[] getNoDictSortColMappingAsDataFieldOrder(DataField[] 
dataFields) {
+    List<Boolean> noDicSortColMap = new ArrayList<>();
+    for (DataField dataField : dataFields) {
+      if (!dataField.getColumn().isInvisible() && 
dataField.getColumn().isDimension()) {
+        if (dataField.getColumn().getColumnSchema().isSortColumn()) {
+          if (dataField.getColumn().getColumnSchema().getDataType() != 
DataTypes.DATE) {
+            noDicSortColMap.add(true);
+          } else {
+            noDicSortColMap.add(false);
+          }
+        }
+      }
+    }
+    Boolean[] mapping = noDicSortColMap.toArray(new Boolean[0]);
+    boolean[] noDicSortColMapping = new boolean[mapping.length];
+    for (int i = 0; i < mapping.length; i++) {
+      noDicSortColMapping[i] = mapping[i];
+    }
+    return noDicSortColMapping;
+  }
+
+  /**
    * If the dimension is added in older version 1.1, by default it will be 
sort column, So during
    * initial sorting, carbonrow will be in order where added sort column is at 
the beginning, But
    * before final merger of sort, the data should be in schema order
@@ -431,6 +477,37 @@ public final class CarbonDataProcessorUtil {
   }
 
   /**
+   * If the dimension is added in older version 1.1, by default it will be 
sort column, So during
+   * initial sorting, carbonrow will be in order where added sort column is at 
the beginning, But
+   * before final merger of sort, the data should be in schema order
+   * (org.apache.carbondata.processing.sort.SchemaBasedRowUpdater updates the 
carbonRow in schema
+   * order), so This method helps to find the index of no dictionary sort 
column in the carbonrow
+   * data.
+   */
+  public static int[] 
getColumnIdxBasedOnSchemaInRowAsDataFieldOrder(DataField[] dataFields) {
+    List<Integer> noDicSortColMap = new ArrayList<>();
+    int counter = 0;
+    for (DataField dataField : dataFields) {
+      if (!dataField.getColumn().isInvisible() && 
dataField.getColumn().isDimension()) {
+        if (dataField.getColumn().getColumnSchema().getDataType() == 
DataTypes.DATE) {
+          continue;
+        }
+        if (dataField.getColumn().getColumnSchema().isSortColumn() && 
DataTypeUtil
+            
.isPrimitiveColumn(dataField.getColumn().getColumnSchema().getDataType())) {
+          noDicSortColMap.add(counter);
+        }
+        counter++;
+      }
+    }
+    Integer[] mapping = noDicSortColMap.toArray(new Integer[0]);
+    int[] columnIdxBasedOnSchemaInRow = new int[mapping.length];
+    for (int i = 0; i < mapping.length; i++) {
+      columnIdxBasedOnSchemaInRow[i] = mapping[i];
+    }
+    return columnIdxBasedOnSchemaInRow;
+  }
+
+  /**
    * Get the data types of the no dictionary sort columns
    *
    * @param carbonTable
@@ -458,6 +535,34 @@ public final class CarbonDataProcessorUtil {
   }
 
   /**
+   * Get the data types of the no dictionary sort columns as per dataFields 
order
+   *
+   * @param dataFields
+   * @return
+   */
+  public static Map<String, DataType[]> 
getNoDictSortAndNoSortDataTypesAsDataFieldOrder(
+      DataField[] dataFields) {
+    List<DataType> noDictSortType = new ArrayList<>();
+    List<DataType> noDictNoSortType = new ArrayList<>();
+    for (DataField dataField : dataFields) {
+      if (dataField.getColumn().isDimension()
+          && dataField.getColumn().getColumnSchema().getDataType() != 
DataTypes.DATE) {
+        if (dataField.getColumn().getColumnSchema().isSortColumn()) {
+          
noDictSortType.add(dataField.getColumn().getColumnSchema().getDataType());
+        } else {
+          
noDictNoSortType.add(dataField.getColumn().getColumnSchema().getDataType());
+        }
+      }
+    }
+    DataType[] noDictSortTypes = noDictSortType.toArray(new 
DataType[noDictSortType.size()]);
+    DataType[] noDictNoSortTypes = noDictNoSortType.toArray(new 
DataType[noDictNoSortType.size()]);
+    Map<String, DataType[]> noDictSortAndNoSortTypes = new HashMap<>(2);
+    noDictSortAndNoSortTypes.put("noDictSortDataTypes", noDictSortTypes);
+    noDictSortAndNoSortTypes.put("noDictNoSortDataTypes", noDictNoSortTypes);
+    return noDictSortAndNoSortTypes;
+  }
+
+  /**
    * This method will get the store location for the given path, segment id 
and partition id
    *
    * @return data directory path

Reply via email to