This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 8865671  [CARBONDATA-3666] Avoided listing of table dir in refresh 
command
8865671 is described below

commit 8865671c32b1cf450ecc1fdc8c278904fe4a8c3f
Author: kunal642 <kunalkapoor...@gmail.com>
AuthorDate: Thu Jan 16 14:28:57 2020 +0530

    [CARBONDATA-3666] Avoided listing of table dir in refresh command
    
    Why is this PR needed?
    Currently if a refresh command is fired on a parquet table using carbon 
session then carbon will list all the tables and check whether the table exists 
or not, then we check if the schema file exists or not by listing the Metadata 
folder. This can be a problem in cloud scenarios as the listing on S3 is slow.
    
    What changes were proposed in this PR?
    get the metadata for the specified table, Then go for table listing only if 
the provider is carbon or the table is not registered in hive
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This closes #3581
---
 .../apache/carbondata/spark/util/CommonUtil.scala  | 10 +++++++
 .../apache/spark/sql/hive/CarbonSessionUtil.scala  | 26 +++++++++---------
 .../management/RefreshCarbonTableCommand.scala     | 31 +++++++++++++---------
 .../spark/sql/execution/strategy/DDLStrategy.scala | 16 +++++------
 .../spark/sql/hive/CarbonFileMetastore.scala       | 18 +++++--------
 5 files changed, 53 insertions(+), 48 deletions(-)

diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index f0fe08b..e70fc24 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.spark.{SparkContext, SparkEnv}
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.execution.command.{ColumnProperty, Field, 
PartitionerField}
 import org.apache.spark.util.FileUtils
 
@@ -832,4 +833,13 @@ object CommonUtil {
     }
     displaySize
   }
+
+  def isCarbonDataSource(catalogTable: CatalogTable): Boolean = {
+    catalogTable.provider match {
+      case Some(x) => x.equalsIgnoreCase("org.apache.spark.sql.CarbonSource") 
||
+                      x.equalsIgnoreCase("carbondata")
+      case None => false
+    }
+  }
+
 }
diff --git 
a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
 
b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
index e3f1d3f..968738a 100644
--- 
a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
+++ 
b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
@@ -35,6 +35,8 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 
+import org.apache.carbondata.spark.util.CommonUtil
+
 /**
  * This class refresh the relation from cache if the carbontable in
  * carbon catalog is not same as cached carbon relation's carbon table.
@@ -59,20 +61,16 @@ object CarbonSessionUtil {
      * Set the stats to none in case of carbontable
      */
     def setStatsNone(catalogTable: CatalogTable): Unit = {
-      catalogTable.provider match {
-        case Some(provider)
-          if provider.equals("org.apache.spark.sql.CarbonSource") ||
-             provider.equalsIgnoreCase("carbondata") =>
-          // Update stats to none in case of carbon table as we are not 
expecting any stats from
-          // Hive. Hive gives wrong stats for carbon table.
-          catalogTable.stats match {
-            case Some(stats) =>
-              CarbonReflectionUtils.setFieldToCaseClass(catalogTable, "stats", 
None)
-            case _ =>
-          }
-          isRelationRefreshed =
-            CarbonEnv.isRefreshRequired(catalogTable.identifier)(sparkSession)
-        case _ =>
+      if (CommonUtil.isCarbonDataSource(catalogTable)) {
+        // Update stats to none in case of carbon table as we are not 
expecting any stats from
+        // Hive. Hive gives wrong stats for carbon table.
+        catalogTable.stats match {
+          case Some(stats) =>
+            CarbonReflectionUtils.setFieldToCaseClass(catalogTable, "stats", 
None)
+          case _ =>
+        }
+        isRelationRefreshed =
+          CarbonEnv.isRefreshRequired(catalogTable.identifier)(sparkSession)
       }
     }
 
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
index 9251cf0..17e628f 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, 
MetadataCommand}
 import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
@@ -41,6 +42,7 @@ import 
org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, 
SegmentStatusManager}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus, 
RefreshTablePostExecutionEvent, RefreshTablePreExecutionEvent}
+import org.apache.carbondata.spark.util.CommonUtil
 
 /**
  * Command to register carbon table from existing carbon table data
@@ -52,24 +54,31 @@ case class RefreshCarbonTableCommand(
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
     val databaseName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
     setAuditTable(databaseName, tableName)
     // Steps
-    // 1. get table path
-    // 2. perform the below steps
-    // 2.1 check if the table already register with hive then ignore and 
continue with the next
-    // schema
+    // 1. Get Table Metadata from spark.
+    // 2 Perform below steps:
+    // 2.1 If table exists then check if provider if carbon. If yes then go 
for carbon
+    // refresh otherwise no need to do anything.
+    // 2.1.1 If table does not exists then consider the table as carbon and 
check for schema file
+    // existence.
     // 2.2 register the table with the hive check if the table being 
registered has aggregate table
     // then do the below steps
     // 2.2.1 validate that all the aggregate tables are copied at the store 
location.
     // 2.2.2 Register the aggregate tables
-    val tablePath = CarbonEnv.getTablePath(databaseNameOp, 
tableName.toLowerCase)(sparkSession)
-    val identifier = AbsoluteTableIdentifier.from(tablePath, databaseName, 
tableName.toLowerCase)
     // 2.1 check if the table already register with hive then ignore and 
continue with the next
     // schema
-    if (!sparkSession.sessionState.catalog.listTables(databaseName)
-      .exists(_.table.equalsIgnoreCase(tableName))) {
+    val isCarbonDataSource = try {
+      CommonUtil.isCarbonDataSource(sparkSession.sessionState.catalog
+        .getTableMetadata(TableIdentifier(tableName, databaseNameOp)))
+    } catch {
+      case _: NoSuchTableException =>
+        true
+    }
+    if (isCarbonDataSource) {
+      val tablePath = CarbonEnv.getTablePath(databaseNameOp, 
tableName.toLowerCase)(sparkSession)
+      val identifier = AbsoluteTableIdentifier.from(tablePath, databaseName, 
tableName.toLowerCase)
       // check the existence of the schema file to know its a carbon table
       val schemaFilePath = 
CarbonTablePath.getSchemaFilePath(identifier.getTablePath)
       // if schema file does not exist then the table will either non carbon 
table or stale
@@ -106,9 +115,7 @@ case class RefreshCarbonTableCommand(
         }
       }
     }
-    RefreshTable(
-      TableIdentifier(identifier.getTableName, 
Option(identifier.getDatabaseName))
-    ).run(sparkSession)
+    RefreshTable(TableIdentifier(tableName, 
Option(databaseName))).run(sparkSession)
   }
 
   /**
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 80d3044..68f7442 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -31,6 +31,8 @@ import 
org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, C
 import org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand
 
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, 
ThreadLocalSessionInfo}
+import org.apache.carbondata.spark.util.CommonUtil
 
 /**
  * Carbon strategies for ddl commands
@@ -147,20 +149,20 @@ class DDLStrategy(sparkSession: SparkSession) extends 
SparkStrategy {
         if isCarbonTable(truncateTable.tableName) =>
         ExecutedCommandExec(CarbonTruncateCommand(truncateTable)) :: Nil
       case 
createta...@org.apache.spark.sql.execution.datasources.CreateTable(_, _, None)
-        if isCarbonDataSourceTable(createTable.tableDesc) =>
+        if CommonUtil.isCarbonDataSource(createTable.tableDesc) =>
         ExecutedCommandExec(DDLHelper.createDataSourceTable(createTable, 
sparkSession)) :: Nil
       case MatchCreateDataSourceTable(tableDesc, mode, query)
-        if isCarbonDataSourceTable(tableDesc) =>
+        if CommonUtil.isCarbonDataSource(tableDesc) =>
         ExecutedCommandExec(
           DDLHelper.createDataSourceTableAsSelect(tableDesc, query, mode, 
sparkSession)
         ) :: Nil
       case org.apache.spark.sql.execution.datasources.CreateTable(tableDesc, 
mode, query)
-        if isCarbonDataSourceTable(tableDesc) =>
+        if CommonUtil.isCarbonDataSource(tableDesc) =>
         ExecutedCommandExec(
           DDLHelper.createDataSourceTableAsSelect(tableDesc, query.get, mode, 
sparkSession)
         ) :: Nil
       case createTable@CreateDataSourceTableCommand(table, _)
-        if isCarbonDataSourceTable(table) =>
+        if CommonUtil.isCarbonDataSource(table) =>
         ExecutedCommandExec(
           DDLHelper.createDataSourceTable(createTable, sparkSession)
         ) :: Nil
@@ -195,12 +197,6 @@ class DDLStrategy(sparkSession: SparkSession) extends 
SparkStrategy {
     CarbonPlanHelper.isCarbonTable(tableIdent, sparkSession)
   }
 
-  private def isCarbonDataSourceTable(table: CatalogTable): Boolean = {
-    table.provider.get != DDLUtils.HIVE_PROVIDER &&
-    (table.provider.get.equals("org.apache.spark.sql.CarbonSource") ||
-     table.provider.get.equalsIgnoreCase("carbondata"))
-  }
-
   private def isCarbonHiveTable(table: CatalogTable): Boolean = {
     table.provider.isDefined &&
     DDLUtils.HIVE_PROVIDER == table.provider.get &&
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 6c7b1f2..15b2a51 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -52,7 +52,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.writer.ThriftWriter
 import org.apache.carbondata.events.{LookupRelationPostEvent, 
OperationContext, OperationListenerBus}
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
-import org.apache.carbondata.spark.util.CarbonSparkUtil
+import org.apache.carbondata.spark.util.{CarbonSparkUtil, CommonUtil}
 
 case class MetaData(var carbonTables: ArrayBuffer[CarbonTable]) {
   // use to lock the carbonTables
@@ -216,12 +216,9 @@ class CarbonFileMetastore extends CarbonMetaStore {
               
"org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation")) =>
         val catalogTable =
           CarbonReflectionUtils.getFieldOfCatalogTable("tableMeta", 
c).asInstanceOf[CatalogTable]
-        catalogTable.provider match {
-          case Some(name) if (name.equals("org.apache.spark.sql.CarbonSource")
-            || name.equalsIgnoreCase("carbondata")) => name
-          case _ =>
-            CarbonMetadata.getInstance().removeTable(database, 
tableIdentifier.table)
-            throw new NoSuchTableException(database, tableIdentifier.table)
+        if (!CommonUtil.isCarbonDataSource(catalogTable)) {
+          CarbonMetadata.getInstance().removeTable(database, 
tableIdentifier.table)
+          throw new NoSuchTableException(database, tableIdentifier.table)
         }
         val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
            catalogTable.location.toString, database, tableIdentifier.table)
@@ -540,11 +537,8 @@ class CarbonFileMetastore extends CarbonMetaStore {
               
"org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation")) =>
         val catalogTable =
           CarbonReflectionUtils.getFieldOfCatalogTable("tableMeta", 
c).asInstanceOf[CatalogTable]
-        catalogTable.provider match {
-          case Some(name) if (name.equals("org.apache.spark.sql.CarbonSource")
-            || name.equalsIgnoreCase("carbondata")) => name
-          case _ =>
-            throw new NoSuchTableException(tableIdentifier.database.get, 
tableIdentifier.table)
+        if (!CommonUtil.isCarbonDataSource(catalogTable)) {
+          throw new NoSuchTableException(tableIdentifier.database.get, 
tableIdentifier.table)
         }
         val tableLocation = catalogTable.storage.locationUri match {
           case tableLoc@Some(uri) =>

Reply via email to