This is an automated email from the ASF dual-hosted git repository. jackylk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 8842a9e [CARBONDATA-3630] update should support limit 1 sub query and empty result subquery 8842a9e is described below commit 8842a9e6c233ad0237dfe23018e9c786c7c8f70b Author: ajantha-bhat <ajanthab...@gmail.com> AuthorDate: Mon Dec 23 20:34:01 2019 +0800 [CARBONDATA-3630] update should support limit 1 sub query and empty result subquery currently update has two flows, update by value and update by join. a. update by join should be used only if subquery join present with maintable, now in non-join maintable sceneario also join is used. Fixed this. b. currently subquery with limit 1 cannot support as it goes to join with main table, supported this c. If subquery with limit 1 with join with main table is present, current design cannot handle it. so throwing exception d. Supporting sub query with 0 results to update as null (behavior is same as mysql) This closes #3528 --- .../testsuite/iud/UpdateCarbonTableTestCase.scala | 54 ++++++++++++++- .../spark/sql/parser/CarbonSpark2SqlParser.scala | 76 ++++++++++++++++++++-- 2 files changed, 123 insertions(+), 7 deletions(-) diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala index cae86c0..0f131b1 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala @@ -19,7 +19,7 @@ package org.apache.carbondata.spark.testsuite.iud import java.io.File import org.apache.spark.sql.test.Spark2TestQueryExecutor -import org.apache.spark.sql.{CarbonEnv, Row, SaveMode} +import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SaveMode} import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} @@ -80,6 +80,58 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { sql("""drop table iud.dest11""").show } + test("update with subquery having limit 1") { + sql("drop table if exists t1") + sql("drop table if exists t2") + sql("create table t1 (age int, name string) stored by 'carbondata'") + sql("insert into t1 select 1, 'aa'") + sql("insert into t1 select 3, 'bb'") + sql("create table t2 (age int, name string) stored by 'carbondata'") + sql("insert into t2 select 3, 'Andy'") + sql("insert into t2 select 2, 'Andy'") + sql("insert into t2 select 1, 'aa'") + sql("insert into t2 select 3, 'aa'") + sql("update t1 set (age) = " + + "(select t2.age from t2 where t2.name = 'Andy' order by age limit 1) " + + "where t1.age = 1 ").show(false) + checkAnswer(sql("select * from t1"), Seq(Row(2,"aa"), Row(3,"bb"))) + sql("drop table if exists t1") + sql("drop table if exists t2") + } + + test("update with subquery giving 0 rows") { + sql("drop table if exists t1") + sql("drop table if exists t2") + sql("create table t1 (age int, name string) stored by 'carbondata'") + sql("insert into t1 select 1, 'aa'") + sql("create table t2 (age int, name string) stored by 'carbondata'") + sql("insert into t2 select 3, 'Andy'") + sql("update t1 set (age) = " + + "(select t2.age from t2 where t2.age != 3) " + + "where t1.age = 1 ").show(false) + // should update to null + checkAnswer(sql("select * from t1"), Seq(Row(null,"aa"))) + sql("drop table if exists t1") + sql("drop table if exists t2") + } + + test("update with subquery joing with main table and limit") { + sql("drop table if exists t1") + sql("drop table if exists t2") + sql("create table t1 (age int, name string) stored by 'carbondata'") + sql("insert into t1 select 1, 'Andy'") + sql("create table t2 (age int, name string) stored by 'carbondata'") + sql("insert into t2 select 3, 'Andy'") + intercept[AnalysisException] { + sql("update t1 set (age) = " + + "(select t2.age from t2 where t2.name = t1.name limit 1) " + + "where t1.age = 1 ").show(false) + }.getMessage.contains("Update subquery has join with maintable " + + "and limit leads to multiple join for each limit for each row") + sql("drop table if exists t1") + sql("drop table if exists t2") + } + test("update carbon table[using destination table columns with where and exist]") { sql("""drop table if exists iud.dest22""") sql("""create table iud.dest22 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala index 69145d8..10b661a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala @@ -20,7 +20,8 @@ package org.apache.spark.sql.parser import scala.collection.mutable import scala.language.implicitConversions -import org.apache.spark.sql.{CarbonToSparkAdapter, DeleteRecords, UpdateTable} +import org.apache.commons.lang3.StringUtils +import org.apache.spark.sql.{CarbonToSparkAdapter, Dataset, DeleteRecords, ProjectForUpdate, SparkSession, UpdateTable} import org.apache.spark.sql.catalyst.{CarbonDDLSqlParser, TableIdentifier} import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -31,7 +32,7 @@ import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnC import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand import org.apache.spark.sql.types.StructField import org.apache.spark.sql.CarbonExpressions.CarbonUnresolvedRelation -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation} import org.apache.spark.sql.execution.command.cache.{CarbonDropCacheCommand, CarbonShowCacheCommand} import org.apache.spark.sql.execution.command.stream.{CarbonCreateStreamCommand, CarbonDropStreamCommand, CarbonShowStreamsCommand} import org.apache.spark.sql.util.CarbonException @@ -247,8 +248,67 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { case tab ~ columns ~ rest => val (sel, where) = splitQuery(rest) val selectPattern = """^\s*select\s+""".r + // In case of "update = (subquery) where something" + // If subquery has join with main table, then only it should go to "update by join" flow. + // Else it should go to "update by value" flow. + // In update by value flow, we need values to update. + // so need to execute plan and collect values from subquery if is not join with main table. + var subQueryResults : String = "" + if (selectPattern.findFirstIn(sel.toLowerCase).isDefined) { + // subQuery starts with select + val mainTableName = tab._4.table + val mainTableAlias = if (tab._3.isDefined) { + tab._3.get + } else { + "" + } + val session = SparkSession.getActiveSession.get + val subQueryUnresolvedLogicalPlan = session.sessionState.sqlParser.parsePlan(sel) + var isJoinWithMainTable : Boolean = false + var isLimitPresent : Boolean = false + subQueryUnresolvedLogicalPlan collect { + case f: Filter => + f.condition.collect { + case attr: UnresolvedAttribute => + if ((!StringUtils.isEmpty(mainTableAlias) && + attr.nameParts.head.equalsIgnoreCase(mainTableAlias)) || + attr.nameParts.head.equalsIgnoreCase(mainTableName)) { + isJoinWithMainTable = true + } + } + case _: GlobalLimit => + isLimitPresent = true + } + if (isJoinWithMainTable && isLimitPresent) { + throw new UnsupportedOperationException( + "Update subquery has join with maintable and limit leads to multiple join for each " + + "limit for each row") + } + if (!isJoinWithMainTable) { + // Should go as value update, not as join update. So execute the sub query. + val analyzedPlan = CarbonReflectionUtils.invokeAnalyzerExecute(session + .sessionState + .analyzer, subQueryUnresolvedLogicalPlan) + val subQueryLogicalPlan = session.sessionState.optimizer.execute(analyzedPlan) + val df = Dataset.ofRows(session, subQueryLogicalPlan) + val rowsCount = df.count() + if (rowsCount == 0L) { + // if length = 0, update to null + subQueryResults = "null" + } else if (rowsCount != 1) { + throw new UnsupportedOperationException( + " update cannot be supported for 1 to N mapping, as more than one value present " + + "for the update key") + } else { + subQueryResults = "'" + df.collect()(0).toSeq.mkString("','") + "'" + } + } + } val (selectStmt, relation) = - if (!selectPattern.findFirstIn(sel.toLowerCase).isDefined) { + if (!selectPattern.findFirstIn(sel.toLowerCase).isDefined || + !StringUtils.isEmpty(subQueryResults)) { + // if subQueryResults are not empty means, it is not join with main table. + // so use subQueryResults in update with value flow. if (sel.trim.isEmpty) { sys.error("At least one source column has to be specified ") } @@ -262,12 +322,16 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { } case _ => tab._1 } - + val newSel = if (!StringUtils.isEmpty(subQueryResults)) { + subQueryResults + } else { + sel + } tab._3 match { case Some(a) => - ("select " + sel + " from " + getTableName(tab._2) + " " + tab._3.get, relation) + ("select " + newSel + " from " + getTableName(tab._2) + " " + tab._3.get, relation) case None => - ("select " + sel + " from " + getTableName(tab._2), relation) + ("select " + newSel + " from " + getTableName(tab._2), relation) } } else {