This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit 00d2fe930e713958e052e3a738616a852f1dbfe7
Author: Manhua <kevin...@qq.com>
AuthorDate: Wed Sep 25 11:47:01 2019 +0800

    [CARBONDATA-3501] Fix update table with varchar column
    
    Problem
    Update on table with varchar column will throw exception
    
    Analyse
    In the loading part of update operation, it gets the isVarcharTypeMapping 
for each column in the order when table created. And this gives a hint for 
checking string length. It does not allow to exceeds 32000 char for a column 
which is not varchar type.
    
    However when changing the plan for updating in CarbonIUDRule, it first 
deletes the old expression and appends the new one, which makes the order 
differ to table created. Such that the string length checking fail.
    
    Solution
    Keep the order as table created when modify update plan
    
    This closes #3398
---
 .../longstring/VarcharDataTypesBasicTestCase.scala      | 10 ++++++++++
 .../command/management/CarbonLoadDataCommand.scala      |  2 +-
 .../org/apache/spark/sql/hive/CarbonAnalysisRules.scala |  4 ++--
 .../org/apache/spark/sql/optimizer/CarbonIUDRule.scala  | 17 ++++++++++++++---
 4 files changed, 27 insertions(+), 6 deletions(-)

diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala
index 4fd2cc0..9719cfc 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala
@@ -389,6 +389,16 @@ class VarcharDataTypesBasicTestCase extends QueryTest with 
BeforeAndAfterEach wi
 
     sql("DROP TABLE IF EXISTS varchar_complex_table")
   }
+  
+  test("update table with long string column") {
+    prepareTable()
+    // update non-varchar column
+    sql(s"update $longStringTable set(id)=(0) where name is not null").show()
+    // update varchar column
+    sql(s"update $longStringTable set(description)=('empty') where name is not 
null").show()
+    // update non-varchar&varchar column
+    sql(s"update $longStringTable set(description, id)=('sth.', 1) where name 
is not null").show()
+  }
 
     // ignore this test in CI, because it will need at least 4GB memory to run 
successfully
   ignore("Exceed 2GB per column page for varchar datatype") {
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 6a03eab..b2f9a1e 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -1060,7 +1060,7 @@ case class CarbonLoadDataCommand(
     val dropAttributes = df.logicalPlan.output.dropRight(1)
     val finalOutput = catalogTable.schema.map { attr =>
       dropAttributes.find { d =>
-        val index = d.name.lastIndexOf("-updatedColumn")
+        val index = 
d.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION)
         if (index > 0) {
           d.name.substring(0, index).equalsIgnoreCase(attr.name)
         } else {
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index 9b923b0..d11bf1e 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -122,9 +122,9 @@ case class CarbonIUDAnalysisRule(sparkSession: 
SparkSession) extends Rule[Logica
         val renamedProjectList = projectList.zip(columns).map { case (attr, 
col) =>
           attr match {
             case UnresolvedAlias(child22, _) =>
-              UnresolvedAlias(Alias(child22, col + "-updatedColumn")())
+              UnresolvedAlias(Alias(child22, col + 
CarbonCommonConstants.UPDATED_COL_EXTENSION)())
             case UnresolvedAttribute(_) =>
-              UnresolvedAlias(Alias(attr, col + "-updatedColumn")())
+              UnresolvedAlias(Alias(attr, col + 
CarbonCommonConstants.UPDATED_COL_EXTENSION)())
             case _ => attr
           }
         }
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala
index ae5825d..da1ca55 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala
@@ -23,6 +23,8 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.rules.Rule
 import 
org.apache.spark.sql.execution.command.mutation.CarbonProjectForUpdateCommand
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+
 /**
  * Rule specific for IUD operations
  */
@@ -37,9 +39,9 @@ class CarbonIUDRule extends Rule[LogicalPlan] with 
PredicateHelper {
         var isTransformed = false
         val newPlan = updatePlan transform {
           case Project(pList, child) if !isTransformed =>
-            val (dest: Seq[NamedExpression], source: Seq[NamedExpression]) = 
pList
+            var (dest: Seq[NamedExpression], source: Seq[NamedExpression]) = 
pList
               .splitAt(pList.size - cols.size)
-            val diff = cols.diff(dest.map(_.name.toLowerCase))
+            // check complex column
             cols.foreach { col =>
               val complexExists = "\"name\":\"" + col + "\""
               if (dest.exists(m => m.dataType.json.contains(complexExists))) {
@@ -47,11 +49,20 @@ class CarbonIUDRule extends Rule[LogicalPlan] with 
PredicateHelper {
                   "Unsupported operation on Complex data type")
               }
             }
+            // check updated columns exists in table
+            val diff = cols.diff(dest.map(_.name.toLowerCase))
             if (diff.nonEmpty) {
               sys.error(s"Unknown column(s) ${ diff.mkString(",") } in table 
${ table.tableName }")
             }
+            // modify plan for updated column *in place*
             isTransformed = true
-            Project(dest.filter(a => !cols.contains(a.name.toLowerCase)) ++ 
source, child)
+            source.foreach { col =>
+              val colName = col.name.substring(0,
+                
col.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION))
+              val updateIdx = dest.indexWhere(_.name.equalsIgnoreCase(colName))
+              dest = dest.updated(updateIdx, col)
+            }
+            Project(dest, child)
         }
         CarbonProjectForUpdateCommand(
           newPlan, table.tableIdentifier.database, 
table.tableIdentifier.table, cols)

Reply via email to