This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ff1e1b  [CARBONDATA-3734] Fix insert failure on partition table when 
parition column is in sort column
3ff1e1b is described below

commit 3ff1e1beeabfc165f9c6ab818969cb4fb023f7a6
Author: ajantha-bhat <ajanthab...@gmail.com>
AuthorDate: Mon Mar 2 19:44:45 2020 +0530

    [CARBONDATA-3734] Fix insert failure on partition table when parition 
column is in sort column
    
    Why is this PR needed?
    
    When partition column is a sort column.
    
    a) currently sort columns won't be the head of attributes, so need to add 
logic as per that, as partition columns will be in the end for global sort.
    
    b) While rearranging the data fields at executor, need to keep partition 
column in the end even though it is in sort column.
    
    What changes were proposed in this PR?
    prepare sort columns for global sort based on the table sort columns.
    Keep partition in the end, even though it is in sort columns for data field 
attributes
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No (already this scenario present in mv test case)
    
    This closes #3654
---
 .../command/management/CommonLoadUtils.scala       | 14 ++++++++++++-
 .../mv/rewrite/TestPartitionWithMV.scala           |  3 +++
 .../processing/loading/DataLoadProcessBuilder.java | 24 ++++++++++++++++++----
 3 files changed, 36 insertions(+), 5 deletions(-)

diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index a0b2676..40a732a 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -504,7 +504,19 @@ object CommonLoadUtils {
       if (numPartitions <= 0) {
         numPartitions = partitionsLen
       }
-      val sortColumns = attributes.take(table.getSortColumns.size())
+      val sortColumns =
+        if (table.isHivePartitionTable) {
+          // In case of partition column as sort column, attribute will not 
have it in the front.
+          // so need to look up the attribute and prepare
+          val sortColsAttr: collection.mutable.ArrayBuffer[AttributeReference] 
= ArrayBuffer()
+          val sortCols = table.getSortColumns.asScala
+          for (sortColumn <- sortCols) {
+            sortColsAttr += attributes.find(x => 
x.name.equalsIgnoreCase(sortColumn)).get
+          }
+          sortColsAttr
+        } else {
+          attributes.take(table.getSortColumns.size())
+        }
       val dataTypes = sortColumns.map(_.dataType)
       val sortedRDD: RDD[InternalRow] =
         GlobalSortHelper.sortBy(updatedRdd, numPartitions, dataTypes)
diff --git 
a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestPartitionWithMV.scala
 
b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestPartitionWithMV.scala
index c781598..bf12589 100644
--- 
a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestPartitionWithMV.scala
+++ 
b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestPartitionWithMV.scala
@@ -637,7 +637,10 @@ class TestPartitionWithMV extends QueryTest with 
BeforeAndAfterAll {
     sql("insert into updatetime_8 select 21,21,'fbv','gbv','wvsw','vwr',25")
     sql("alter table updatetime_8 compact 'minor'")
     sql("alter table updatetime_8 compact 'minor'")
+    val df = sql("select sum(hs_len) from updatetime_8 group by imex")
+    assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "ag"))
     checkAnswer(sql("select sum(hs_len) from updatetime_8 group by 
imex"),Seq(Row(40),Row(42),Row(83)))
+    sql("drop table updatetime_8").show(200, false)
   }
 
   test("check partitioning for child tables with various combinations") {
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index ab41c40..966f828 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -252,9 +252,9 @@ public final class DataLoadProcessBuilder {
           partitionColumns, dataFields);
     } else {
       getDataFields(loadModel, dimensions, measures, complexDataFields, 
dataFields);
+      dataFields = updateDataFieldsBasedOnSortColumns(dataFields);
     }
-    configuration.setDataFields(
-        updateDataFieldsBasedOnSortColumns(dataFields).toArray(new 
DataField[dataFields.size()]));
+    configuration.setDataFields(dataFields.toArray(new DataField[0]));
     configuration.setBucketingInfo(carbonTable.getBucketingInfo());
     configuration.setBucketHashMethod(carbonTable.getBucketHashMethod());
     configuration.setPreFetch(loadModel.isPreFetch());
@@ -326,6 +326,10 @@ public final class DataLoadProcessBuilder {
     } else {
       partitionColumnSchemaList = new ArrayList<>();
     }
+    // 1.1 compatibility, dimensions will not have sort columns in the 
beginning in 1.1.
+    // Need to keep at the beginning now
+    List<DataField> sortDataFields = new ArrayList<>();
+    List<DataField> noSortDataFields = new ArrayList<>();
     for (CarbonColumn column : dimensions) {
       DataField dataField = new DataField(column);
       if (column.isComplex()) {
@@ -356,11 +360,23 @@ public final class DataLoadProcessBuilder {
             .contains(column.getColumnSchema())) {
           partitionColumns.add(dataField);
         } else {
-          dataFields.add(dataField);
+          if (dataField.getColumn().getColumnSchema().isSortColumn()) {
+            sortDataFields.add(dataField);
+          } else {
+            noSortDataFields.add(dataField);
+          }
         }
       }
     }
-    dataFields.addAll(complexDataFields);
+    if (sortDataFields.size() != 0) {
+      dataFields.addAll(sortDataFields);
+    }
+    if (noSortDataFields.size() != 0) {
+      dataFields.addAll(noSortDataFields);
+    }
+    if (complexDataFields.size() != 0) {
+      dataFields.addAll(complexDataFields);
+    }
     for (CarbonColumn column : measures) {
       if (partitionColumnSchemaList.size() != 0 && partitionColumnSchemaList
           .contains(column.getColumnSchema())) {

Reply via email to