Fixed delete with subquery issue
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8ceb069e Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8ceb069e Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8ceb069e Branch: refs/heads/branch-1.1 Commit: 8ceb069ed98f97c31dfbdab1be7cde223a6a4a4c Parents: f701521 Author: ravipesala <ravi.pes...@gmail.com> Authored: Sun Jun 18 00:49:40 2017 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Sun Jun 18 14:14:04 2017 +0530 ---------------------------------------------------------------------- .../iud/DeleteCarbonTableTestCase.scala | 24 -------- .../sql/optimizer/CarbonLateDecodeRule.scala | 39 ++++++------ .../iud/DeleteCarbonTableSubqueryTestCase.scala | 63 ++++++++++++++++++++ 3 files changed, 85 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/8ceb069e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala index 0346067..2e59c9c 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala @@ -25,8 +25,6 @@ import org.apache.carbondata.core.util.CarbonProperties class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { override def beforeAll { - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER , "false") sql("use default") sql("drop database if exists iud_db cascade") sql("create database iud_db") @@ -97,26 +95,6 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { ) } -// test("delete data from carbon table[where IN (sub query) ]") { -// sql("""drop table if exists iud_db.dest""") -// sql("""create table iud_db.dest (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show() -// sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud_db.dest""") -// sql("""delete from iud_db.dest where c1 IN (select c11 from source2)""").show(truncate = false) -// checkAnswer( -// sql("""select c1 from iud_db.dest"""), -// Seq(Row("c"), Row("d"), Row("e")) -// ) -// } -// test("delete data from carbon table[where IN (sub query with where clause) ]") { -// sql("""drop table if exists iud_db.dest""") -// sql("""create table iud_db.dest (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show() -// sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud_db.dest""") -// sql("""delete from iud_db.dest where c1 IN (select c11 from source2 where c11 = 'b')""").show() -// checkAnswer( -// sql("""select c1 from iud_db.dest"""), -// Seq(Row("a"), Row("c"), Row("d"), Row("e")) -// ) -// } test("delete data from carbon table[where numeric condition ]") { sql("""drop table if exists iud_db.dest""") sql("""create table iud_db.dest (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") @@ -128,8 +106,6 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { ) } override def afterAll { - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER , "true") sql("use default") sql("drop database if exists iud_db cascade") } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8ceb069e/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala index 7a6c513..ae2e46b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala @@ -86,29 +86,34 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { } private def pushDownUDFToJoinLeftRelation(plan: LogicalPlan): LogicalPlan = { - val output = plan match { + val output = plan.transform { case proj@Project(cols, Join( left, right, jointype: org.apache.spark.sql.catalyst.plans.JoinType, condition)) => var projectionToBeAdded: Seq[org.apache.spark.sql.catalyst.expressions.Alias] = Seq.empty - val newCols = cols.map { col => - col match { - case a@Alias(s: ScalaUDF, name) - if (name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) || - name.equalsIgnoreCase( - CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)) => - projectionToBeAdded :+= a - AttributeReference(name, StringType, true)().withExprId(a.exprId) + var udfExists = false + val newCols = cols.map { + case a@Alias(s: ScalaUDF, name) + if name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) || + name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) => + udfExists = true + projectionToBeAdded :+= a + AttributeReference(name, StringType, nullable = true)().withExprId(a.exprId) + case other => other + } + if (udfExists) { + val newLeft = left match { + case Project(columns, logicalPlan) => + Project(columns ++ projectionToBeAdded, logicalPlan) + case filter: Filter => + Project(filter.output ++ projectionToBeAdded, filter) + case relation: LogicalRelation => + Project(relation.output ++ projectionToBeAdded, relation) case other => other } + Project(newCols, Join(newLeft, right, jointype, condition)) + } else { + proj } - val newLeft = left match { - case Project(columns, logicalPlan) => - Project(columns ++ projectionToBeAdded, logicalPlan) - case filter: Filter => - Project(filter.output ++ projectionToBeAdded, filter) - case other => other - } - Project(newCols, Join(newLeft, right, jointype, condition)) case other => other } output http://git-wip-us.apache.org/repos/asf/carbondata/blob/8ceb069e/integration/spark2/src/test/scala/org/apache/spark/carbondata/iud/DeleteCarbonTableSubqueryTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/iud/DeleteCarbonTableSubqueryTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/iud/DeleteCarbonTableSubqueryTestCase.scala new file mode 100644 index 0000000..ff6196c --- /dev/null +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/iud/DeleteCarbonTableSubqueryTestCase.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.carbondata.iud + +import org.apache.spark.sql.Row +import org.apache.spark.sql.common.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +class DeleteCarbonTableSubqueryTestCase extends QueryTest with BeforeAndAfterAll { + override def beforeAll { + sql("use default") + sql("drop database if exists iud_db_sub cascade") + sql("create database iud_db_sub") + + sql("""create table iud_db_sub.source2 (c11 string,c22 int,c33 string,c55 string, c66 int) STORED BY 'org.apache.carbondata.format'""") + sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/source2.csv' INTO table iud_db_sub.source2""") + sql("use iud_db_sub") + } + + test("delete data from carbon table[where IN (sub query) ]") { + sql("""drop table if exists iud_db_sub.dest""") + sql("""create table iud_db_sub.dest (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show() + sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud_db_sub.dest""") + sql("""delete from iud_db_sub.dest where c1 IN (select c11 from source2)""").show(truncate = false) + checkAnswer( + sql("""select c1 from iud_db_sub.dest"""), + Seq(Row("c"), Row("d"), Row("e")) + ) + } + + test("delete data from carbon table[where IN (sub query with where clause) ]") { + sql("""drop table if exists iud_db_sub.dest""") + sql("""create table iud_db_sub.dest (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show() + sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud_db_sub.dest""") + sql("""delete from iud_db_sub.dest where c1 IN (select c11 from source2 where c11 = 'b')""").show() + checkAnswer( + sql("""select c1 from iud_db_sub.dest"""), + Seq(Row("a"), Row("c"), Row("d"), Row("e")) + ) + } + + override def afterAll { + sql("use default") + sql("drop database if exists iud_db_sub cascade") + } +} \ No newline at end of file