This is an automated email from the ASF dual-hosted git repository. ajantha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 93e7856 [CARBONDATA-3752] Reuse Exchange to fix performance issue 93e7856 is described below commit 93e7856665b0f80c4441df958e89d1f5fcf4daa2 Author: QiangCai <qiang...@qq.com> AuthorDate: Thu Mar 26 17:18:36 2020 +0800 [CARBONDATA-3752] Reuse Exchange to fix performance issue Why is this PR needed? Spark ReusedExchange rule can't recognition the same Exchange plan on carbon table. So the query on the carbon table doesn't reuse Exchange, it leads to bad performance. For Example: create table t1(c1 int, c2 string) using carbondata explain select c2, sum(c1) from t1 group by c2 union all select c2, sum(c1) from t1 group by c2 physical plan as following: Union :- *(2) HashAggregate(keys=[c2#37], functions=[sum(cast(c1#36 as bigint))]) : +- Exchange hashpartitioning(c2#37, 200) : +- *(1) HashAggregate(keys=[c2#37], functions=[partial_sum(cast(c1#36 as bigint))]) : +- *(1) FileScan carbondata default.t1[c1#36,c2#37] ReadSchema: struct<c1:int,c2:string> +- *(4) HashAggregate(keys=[c2#37], functions=[sum(cast(c1#36 as bigint))]) +- Exchange hashpartitioning(c2#37, 200) +- *(3) HashAggregate(keys=[c2#37], functions=[partial_sum(cast(c1#36 as bigint))]) +- *(3) FileScan carbondata default.t1[c1#36,c2#37] ReadSchema: struct<c1:int,c2:string> after change, physical plan as following: Union :- *(2) HashAggregate(keys=[c2#37], functions=[sum(cast(c1#36 as bigint))]) : +- Exchange hashpartitioning(c2#37, 200) : +- *(1) HashAggregate(keys=[c2#37], functions=[partial_sum(cast(c1#36 as bigint))]) : +- *(1) FileScan carbondata default.t1[c1#36,c2#37] ReadSchema: struct<c1:int,c2:string> +- *(4) HashAggregate(keys=[c2#37], functions=[sum(cast(c1#36 as bigint))]) +- ReusedExchange [c2#37, sum#54L], Exchange hashpartitioning(c2#37, 200) What changes were proposed in this PR? change CarbonFileIndex class to case class. Does this PR introduce any user interface change? No Is any new testcase added? Yes This closes #3681 --- .../execution/datasources/CarbonFileIndex.scala | 2 +- .../datasources/CarbonFileIndexReplaceRule.scala | 2 +- .../strategy/CarbonLateDecodeStrategy.scala | 2 +- .../carbondata/query/ReusedExchangeTestSuite.scala | 47 ++++++++++++++++++++++ 4 files changed, 50 insertions(+), 3 deletions(-) diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala b/integration/spark/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala index 9306106..4b8f841 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala @@ -44,7 +44,7 @@ import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputForma * Its a custom implementation which uses carbon's driver pruning feature to prune carbondata files * using carbonindex. */ -class CarbonFileIndex( +case class CarbonFileIndex( sparkSession: SparkSession, dataSchema: StructType, parameters: Map[String, String], diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndexReplaceRule.scala b/integration/spark/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndexReplaceRule.scala index b0d9c07..ebce50f 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndexReplaceRule.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndexReplaceRule.scala @@ -48,7 +48,7 @@ class CarbonFileIndexReplaceRule extends Rule[LogicalPlan] { !l.relation.asInstanceOf[HadoopFsRelation].location.isInstanceOf[CarbonFileIndex] => val fsRelation = l.relation.asInstanceOf[HadoopFsRelation] val fileIndex = fsRelation.location - val carbonFileIndex = new CarbonFileIndex(fsRelation.sparkSession, + val carbonFileIndex = CarbonFileIndex(fsRelation.sparkSession, fsRelation.dataSchema, fsRelation.options, updateFileIndex(fileIndex, fsRelation)) diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala index e65c65d..70713ee 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala @@ -868,7 +868,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { val sparkSession = relation.relation.sqlContext.sparkSession relation.catalogTable match { case Some(catalogTable) => - val fileIndex = new CarbonFileIndex(sparkSession, + val fileIndex = CarbonFileIndex(sparkSession, catalogTable.schema, catalogTable.storage.properties, new CatalogFileIndex( diff --git a/integration/spark/src/test/scala/org/apache/spark/carbondata/query/ReusedExchangeTestSuite.scala b/integration/spark/src/test/scala/org/apache/spark/carbondata/query/ReusedExchangeTestSuite.scala new file mode 100644 index 0000000..15e7b81 --- /dev/null +++ b/integration/spark/src/test/scala/org/apache/spark/carbondata/query/ReusedExchangeTestSuite.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.carbondata.query + +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +/** + * test ReusedExchange on carbon table + */ +class ReusedExchangeTestSuite extends QueryTest with BeforeAndAfterAll { + + override def beforeAll(): Unit = { + sql("drop table if exists t_reused_exchange") + } + + override def afterAll(): Unit = { + sql("drop table if exists t_reused_exchange") + } + + test("test ReusedExchange") { + sql("create table t_reused_exchange(c1 int, c2 string) using carbondata") + checkExistence( + sql( + """ + | explain + | select c2, sum(c1) from t_reused_exchange group by c2 + | union all + | select c2, sum(c1) from t_reused_exchange group by c2 + |""".stripMargin), true, "ReusedExchange") + } +}