This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 86a6e88  [CARBONDATA-3837] Fallback to the original plan when mv 
rewrite throw exception
86a6e88 is described below

commit 86a6e88916eeb5995e65787021d600fa38cd17da
Author: QiangCai <qiang...@qq.com>
AuthorDate: Mon Jun 1 00:01:47 2020 +0800

    [CARBONDATA-3837] Fallback to the original plan when mv rewrite throw 
exception
    
    Why is this PR needed?
    All plans are checking MVRewriteRule,
    if MVRewriteRule throw an exception, it will lead to query failure
    
    What changes were proposed in this PR?
    Only the query should check MVRewriteRule, other plans should skip it 
quickly.
    
    catch all exceptions of MVRewriteRule, and fallback to original plan.
    
    Does this PR introduce any user interface change?
    
    No
    Is any new testcase added?
    
    Yes
    
    This closes #3777
---
 .../apache/carbondata/core/view/MVProvider.java    | 20 ++++++++++++++---
 .../apache/carbondata/view/MVCatalogInSpark.scala  | 25 +++++++++++-----------
 .../apache/spark/sql/optimizer/MVRewriteRule.scala | 21 +++++++++++++++---
 3 files changed, 48 insertions(+), 18 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/view/MVProvider.java 
b/core/src/main/java/org/apache/carbondata/core/view/MVProvider.java
index 429f274..1259f91 100644
--- a/core/src/main/java/org/apache/carbondata/core/view/MVProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/view/MVProvider.java
@@ -149,14 +149,28 @@ public class MVProvider {
 
   private String getStatusFileName(MVManager viewManager, String databaseName) 
{
     String databaseLocation = viewManager.getDatabaseLocation(databaseName);
-    return FileFactory.getCarbonFile(databaseLocation).getCanonicalPath() +
-            CarbonCommonConstants.FILE_SEPARATOR + "_system" +
-            CarbonCommonConstants.FILE_SEPARATOR + STATUS_FILE_NAME;
+    try {
+      if (FileFactory.isFileExist(databaseLocation)) {
+        return FileFactory.getCarbonFile(databaseLocation).getCanonicalPath()
+            + CarbonCommonConstants.FILE_SEPARATOR + "_system"
+            + CarbonCommonConstants.FILE_SEPARATOR + STATUS_FILE_NAME;
+      } else {
+        // this database folder is not exists
+        return null;
+      }
+    } catch (IOException e) {
+      // avoid to impact other query on all databases because of mv failure on 
this database
+      LOG.warn("Failed to get mv status file for database " + databaseName, e);
+      return null;
+    }
   }
 
   public List<MVStatusDetail> getStatusDetails(MVManager viewManager, String 
databaseName)
       throws IOException {
     String statusPath = this.getStatusFileName(viewManager, databaseName);
+    if (statusPath == null) {
+      return Collections.emptyList();
+    }
     Gson gsonObjectToRead = new Gson();
     DataInputStream dataInputStream = null;
     BufferedReader buffReader = null;
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/view/MVCatalogInSpark.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/view/MVCatalogInSpark.scala
index b75a88b..c56b7e1 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/view/MVCatalogInSpark.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/view/MVCatalogInSpark.scala
@@ -90,18 +90,19 @@ case class MVCatalogInSpark(session: SparkSession)
   def registerSchema(mvSchema: MVSchema): Unit = {
     withWriteLock {
       val currentDatabase = session.catalog.currentDatabase
-
-      // This is required because mv schemas are across databases, so while 
loading the
-      // catalog, if the mv is in database other than 
sparkSession.currentDataBase(), then it
-      // fails to register, so set the database present in the mvSchema Object
-      
session.catalog.setCurrentDatabase(mvSchema.getIdentifier.getDatabaseName)
-      val logicalPlan = MVHelper.dropDummyFunction(
-        MVQueryParser.getQueryPlan(mvSchema.getQuery, session))
-      // here setting back to current database of current session, because if 
the actual query
-      // contains db name in query like, select db1.column1 from table and 
current database is
-      // default and if we drop the db1, still the session has current db as 
db1.
-      // So setting back to current database.
-      session.catalog.setCurrentDatabase(currentDatabase)
+      val logicalPlan = try {
+        // This is required because mv schemas are across databases, so while 
loading the
+        // catalog, if the mv is in database other than 
sparkSession.currentDataBase(), then it
+        // fails to register, so set the database present in the mvSchema 
Object
+        
session.catalog.setCurrentDatabase(mvSchema.getIdentifier.getDatabaseName)
+        
MVHelper.dropDummyFunction(MVQueryParser.getQueryPlan(mvSchema.getQuery, 
session))
+      } finally {
+        // here setting back to current database of current session, because 
if the actual query
+        // contains db name in query like, select db1.column1 from table and 
current database is
+        // default and if we drop the db1, still the session has current db as 
db1.
+        // So setting back to current database.
+        session.catalog.setCurrentDatabase(currentDatabase)
+      }
       val mvSignature = SimpleModularizer.modularize(
         BirdcageOptimizer.execute(logicalPlan)).next().semiHarmonized.signature
       val mvIdentifier = mvSchema.getIdentifier
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewriteRule.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewriteRule.scala
index 4bcca7d..f475a2b 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewriteRule.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewriteRule.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, 
UnresolvedAttribute}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation}
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, ScalaUDF}
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Command, 
DeserializeToObject, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Command, 
DeserializeToObject, LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 
@@ -39,8 +39,6 @@ import org.apache.carbondata.view.MVFunctions.DUMMY_FUNCTION
  */
 class MVRewriteRule(session: SparkSession) extends Rule[LogicalPlan] {
 
-  private val logger = MVRewriteRule.LOGGER
-
   private val catalogFactory = new MVCatalogFactory[MVSchemaWrapper] {
     override def newCatalog(): MVCatalog[MVSchemaWrapper] = {
       new MVCatalogInSpark(session)
@@ -48,6 +46,23 @@ class MVRewriteRule(session: SparkSession) extends 
Rule[LogicalPlan] {
   }
 
   override def apply(logicalPlan: LogicalPlan): LogicalPlan = {
+    // only query need to check this rule
+    logicalPlan match {
+      case _: Command => return logicalPlan
+      case _: LocalRelation => return logicalPlan
+      case _ =>
+    }
+    try {
+      tryRewritePlan(logicalPlan)
+    } catch {
+      case e =>
+        // if exception is thrown while rewriting the query, will fallback to 
original query plan.
+        MVRewriteRule.LOGGER.warn("Failed to rewrite plan with mv: " + 
e.getMessage)
+        logicalPlan
+    }
+  }
+
+  private def tryRewritePlan(logicalPlan: LogicalPlan): LogicalPlan = {
     var canApply = true
     logicalPlan.transformAllExpressions {
       // first check if any mv UDF is applied it is present is in plan

Reply via email to