This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 93e7856  [CARBONDATA-3752] Reuse Exchange to fix performance issue
93e7856 is described below

commit 93e7856665b0f80c4441df958e89d1f5fcf4daa2
Author: QiangCai <qiang...@qq.com>
AuthorDate: Thu Mar 26 17:18:36 2020 +0800

    [CARBONDATA-3752] Reuse Exchange to fix performance issue
    
    Why is this PR needed?
    Spark ReusedExchange rule can't recognition the same Exchange plan on 
carbon table.
    So the query on the carbon table doesn't reuse Exchange, it leads to bad 
performance.
    
    For Example:
    
    create table t1(c1 int, c2 string) using carbondata
    
    explain
    select c2, sum(c1) from t1 group by c2
    union all
    select c2, sum(c1) from t1 group by c2
    physical plan as following:
    
    Union
    :- *(2) HashAggregate(keys=[c2#37], functions=[sum(cast(c1#36 as bigint))])
    : +- Exchange hashpartitioning(c2#37, 200)
    : +- *(1) HashAggregate(keys=[c2#37], functions=[partial_sum(cast(c1#36 as 
bigint))])
    : +- *(1) FileScan carbondata default.t1[c1#36,c2#37] ReadSchema: 
struct<c1:int,c2:string>
    +- *(4) HashAggregate(keys=[c2#37], functions=[sum(cast(c1#36 as bigint))])
     +- Exchange hashpartitioning(c2#37, 200)
     +- *(3) HashAggregate(keys=[c2#37], functions=[partial_sum(cast(c1#36 as 
bigint))])
     +- *(3) FileScan carbondata default.t1[c1#36,c2#37] ReadSchema: 
struct<c1:int,c2:string>
    after change, physical plan as following:
    
    Union
    :- *(2) HashAggregate(keys=[c2#37], functions=[sum(cast(c1#36 as bigint))])
    :  +- Exchange hashpartitioning(c2#37, 200)
    :     +- *(1) HashAggregate(keys=[c2#37], functions=[partial_sum(cast(c1#36 
as bigint))])
    :        +- *(1) FileScan carbondata default.t1[c1#36,c2#37] ReadSchema: 
struct<c1:int,c2:string>
    +- *(4) HashAggregate(keys=[c2#37], functions=[sum(cast(c1#36 as bigint))])
       +- ReusedExchange [c2#37, sum#54L], Exchange hashpartitioning(c2#37, 200)
    What changes were proposed in this PR?
    change CarbonFileIndex class to case class.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #3681
---
 .../execution/datasources/CarbonFileIndex.scala    |  2 +-
 .../datasources/CarbonFileIndexReplaceRule.scala   |  2 +-
 .../strategy/CarbonLateDecodeStrategy.scala        |  2 +-
 .../carbondata/query/ReusedExchangeTestSuite.scala | 47 ++++++++++++++++++++++
 4 files changed, 50 insertions(+), 3 deletions(-)

diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
index 9306106..4b8f841 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
@@ -44,7 +44,7 @@ import 
org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputForma
  * Its a custom implementation which uses carbon's driver pruning feature to 
prune carbondata files
  * using carbonindex.
  */
-class CarbonFileIndex(
+case class CarbonFileIndex(
     sparkSession: SparkSession,
     dataSchema: StructType,
     parameters: Map[String, String],
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndexReplaceRule.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndexReplaceRule.scala
index b0d9c07..ebce50f 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndexReplaceRule.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndexReplaceRule.scala
@@ -48,7 +48,7 @@ class CarbonFileIndexReplaceRule extends Rule[LogicalPlan] {
            
!l.relation.asInstanceOf[HadoopFsRelation].location.isInstanceOf[CarbonFileIndex]
 =>
         val fsRelation = l.relation.asInstanceOf[HadoopFsRelation]
         val fileIndex = fsRelation.location
-        val carbonFileIndex = new CarbonFileIndex(fsRelation.sparkSession,
+        val carbonFileIndex = CarbonFileIndex(fsRelation.sparkSession,
           fsRelation.dataSchema,
           fsRelation.options,
           updateFileIndex(fileIndex, fsRelation))
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index e65c65d..70713ee 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -868,7 +868,7 @@ private[sql] class CarbonLateDecodeStrategy extends 
SparkStrategy {
     val sparkSession = relation.relation.sqlContext.sparkSession
     relation.catalogTable match {
       case Some(catalogTable) =>
-        val fileIndex = new CarbonFileIndex(sparkSession,
+        val fileIndex = CarbonFileIndex(sparkSession,
           catalogTable.schema,
           catalogTable.storage.properties,
           new CatalogFileIndex(
diff --git 
a/integration/spark/src/test/scala/org/apache/spark/carbondata/query/ReusedExchangeTestSuite.scala
 
b/integration/spark/src/test/scala/org/apache/spark/carbondata/query/ReusedExchangeTestSuite.scala
new file mode 100644
index 0000000..15e7b81
--- /dev/null
+++ 
b/integration/spark/src/test/scala/org/apache/spark/carbondata/query/ReusedExchangeTestSuite.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.carbondata.query
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+/**
+ * test ReusedExchange on carbon table
+ */
+class ReusedExchangeTestSuite extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll(): Unit = {
+    sql("drop table if exists t_reused_exchange")
+  }
+
+  override def afterAll(): Unit = {
+    sql("drop table if exists t_reused_exchange")
+  }
+
+  test("test ReusedExchange") {
+    sql("create table t_reused_exchange(c1 int, c2 string) using carbondata")
+    checkExistence(
+      sql(
+        """
+          | explain
+          | select c2, sum(c1) from t_reused_exchange group by c2
+          | union all
+          | select c2, sum(c1) from t_reused_exchange group by c2
+          |""".stripMargin), true, "ReusedExchange")
+  }
+}

Reply via email to