This is an automated email from the ASF dual-hosted git repository. jackylk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 8865671 [CARBONDATA-3666] Avoided listing of table dir in refresh command 8865671 is described below commit 8865671c32b1cf450ecc1fdc8c278904fe4a8c3f Author: kunal642 <kunalkapoor...@gmail.com> AuthorDate: Thu Jan 16 14:28:57 2020 +0530 [CARBONDATA-3666] Avoided listing of table dir in refresh command Why is this PR needed? Currently if a refresh command is fired on a parquet table using carbon session then carbon will list all the tables and check whether the table exists or not, then we check if the schema file exists or not by listing the Metadata folder. This can be a problem in cloud scenarios as the listing on S3 is slow. What changes were proposed in this PR? get the metadata for the specified table, Then go for table listing only if the provider is carbon or the table is not registered in hive Does this PR introduce any user interface change? No Is any new testcase added? No This closes #3581 --- .../apache/carbondata/spark/util/CommonUtil.scala | 10 +++++++ .../apache/spark/sql/hive/CarbonSessionUtil.scala | 26 +++++++++--------- .../management/RefreshCarbonTableCommand.scala | 31 +++++++++++++--------- .../spark/sql/execution/strategy/DDLStrategy.scala | 16 +++++------ .../spark/sql/hive/CarbonFileMetastore.scala | 18 +++++-------- 5 files changed, 53 insertions(+), 48 deletions(-) diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index f0fe08b..e70fc24 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.spark.{SparkContext, SparkEnv} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.execution.command.{ColumnProperty, Field, PartitionerField} import org.apache.spark.util.FileUtils @@ -832,4 +833,13 @@ object CommonUtil { } displaySize } + + def isCarbonDataSource(catalogTable: CatalogTable): Boolean = { + catalogTable.provider match { + case Some(x) => x.equalsIgnoreCase("org.apache.spark.sql.CarbonSource") || + x.equalsIgnoreCase("carbondata") + case None => false + } + } + } diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala index e3f1d3f..968738a 100644 --- a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala +++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala @@ -35,6 +35,8 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.carbondata.spark.util.CommonUtil + /** * This class refresh the relation from cache if the carbontable in * carbon catalog is not same as cached carbon relation's carbon table. @@ -59,20 +61,16 @@ object CarbonSessionUtil { * Set the stats to none in case of carbontable */ def setStatsNone(catalogTable: CatalogTable): Unit = { - catalogTable.provider match { - case Some(provider) - if provider.equals("org.apache.spark.sql.CarbonSource") || - provider.equalsIgnoreCase("carbondata") => - // Update stats to none in case of carbon table as we are not expecting any stats from - // Hive. Hive gives wrong stats for carbon table. - catalogTable.stats match { - case Some(stats) => - CarbonReflectionUtils.setFieldToCaseClass(catalogTable, "stats", None) - case _ => - } - isRelationRefreshed = - CarbonEnv.isRefreshRequired(catalogTable.identifier)(sparkSession) - case _ => + if (CommonUtil.isCarbonDataSource(catalogTable)) { + // Update stats to none in case of carbon table as we are not expecting any stats from + // Hive. Hive gives wrong stats for carbon table. + catalogTable.stats match { + case Some(stats) => + CarbonReflectionUtils.setFieldToCaseClass(catalogTable, "stats", None) + case _ => + } + isRelationRefreshed = + CarbonEnv.isRefreshRequired(catalogTable.identifier)(sparkSession) } } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala index 9251cf0..17e628f 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala @@ -24,6 +24,7 @@ import scala.collection.mutable import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, MetadataCommand} import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand @@ -41,6 +42,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{OperationContext, OperationListenerBus, RefreshTablePostExecutionEvent, RefreshTablePreExecutionEvent} +import org.apache.carbondata.spark.util.CommonUtil /** * Command to register carbon table from existing carbon table data @@ -52,24 +54,31 @@ case class RefreshCarbonTableCommand( val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) override def processMetadata(sparkSession: SparkSession): Seq[Row] = { - val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore val databaseName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession) setAuditTable(databaseName, tableName) // Steps - // 1. get table path - // 2. perform the below steps - // 2.1 check if the table already register with hive then ignore and continue with the next - // schema + // 1. Get Table Metadata from spark. + // 2 Perform below steps: + // 2.1 If table exists then check if provider if carbon. If yes then go for carbon + // refresh otherwise no need to do anything. + // 2.1.1 If table does not exists then consider the table as carbon and check for schema file + // existence. // 2.2 register the table with the hive check if the table being registered has aggregate table // then do the below steps // 2.2.1 validate that all the aggregate tables are copied at the store location. // 2.2.2 Register the aggregate tables - val tablePath = CarbonEnv.getTablePath(databaseNameOp, tableName.toLowerCase)(sparkSession) - val identifier = AbsoluteTableIdentifier.from(tablePath, databaseName, tableName.toLowerCase) // 2.1 check if the table already register with hive then ignore and continue with the next // schema - if (!sparkSession.sessionState.catalog.listTables(databaseName) - .exists(_.table.equalsIgnoreCase(tableName))) { + val isCarbonDataSource = try { + CommonUtil.isCarbonDataSource(sparkSession.sessionState.catalog + .getTableMetadata(TableIdentifier(tableName, databaseNameOp))) + } catch { + case _: NoSuchTableException => + true + } + if (isCarbonDataSource) { + val tablePath = CarbonEnv.getTablePath(databaseNameOp, tableName.toLowerCase)(sparkSession) + val identifier = AbsoluteTableIdentifier.from(tablePath, databaseName, tableName.toLowerCase) // check the existence of the schema file to know its a carbon table val schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath) // if schema file does not exist then the table will either non carbon table or stale @@ -106,9 +115,7 @@ case class RefreshCarbonTableCommand( } } } - RefreshTable( - TableIdentifier(identifier.getTableName, Option(identifier.getDatabaseName)) - ).run(sparkSession) + RefreshTable(TableIdentifier(tableName, Option(databaseName))).run(sparkSession) } /** diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala index 80d3044..68f7442 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala @@ -31,6 +31,8 @@ import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, C import org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo} +import org.apache.carbondata.spark.util.CommonUtil /** * Carbon strategies for ddl commands @@ -147,20 +149,20 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { if isCarbonTable(truncateTable.tableName) => ExecutedCommandExec(CarbonTruncateCommand(truncateTable)) :: Nil case createta...@org.apache.spark.sql.execution.datasources.CreateTable(_, _, None) - if isCarbonDataSourceTable(createTable.tableDesc) => + if CommonUtil.isCarbonDataSource(createTable.tableDesc) => ExecutedCommandExec(DDLHelper.createDataSourceTable(createTable, sparkSession)) :: Nil case MatchCreateDataSourceTable(tableDesc, mode, query) - if isCarbonDataSourceTable(tableDesc) => + if CommonUtil.isCarbonDataSource(tableDesc) => ExecutedCommandExec( DDLHelper.createDataSourceTableAsSelect(tableDesc, query, mode, sparkSession) ) :: Nil case org.apache.spark.sql.execution.datasources.CreateTable(tableDesc, mode, query) - if isCarbonDataSourceTable(tableDesc) => + if CommonUtil.isCarbonDataSource(tableDesc) => ExecutedCommandExec( DDLHelper.createDataSourceTableAsSelect(tableDesc, query.get, mode, sparkSession) ) :: Nil case createTable@CreateDataSourceTableCommand(table, _) - if isCarbonDataSourceTable(table) => + if CommonUtil.isCarbonDataSource(table) => ExecutedCommandExec( DDLHelper.createDataSourceTable(createTable, sparkSession) ) :: Nil @@ -195,12 +197,6 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { CarbonPlanHelper.isCarbonTable(tableIdent, sparkSession) } - private def isCarbonDataSourceTable(table: CatalogTable): Boolean = { - table.provider.get != DDLUtils.HIVE_PROVIDER && - (table.provider.get.equals("org.apache.spark.sql.CarbonSource") || - table.provider.get.equalsIgnoreCase("carbondata")) - } - private def isCarbonHiveTable(table: CatalogTable): Boolean = { table.provider.isDefined && DDLUtils.HIVE_PROVIDER == table.provider.get && diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala index 6c7b1f2..15b2a51 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala @@ -52,7 +52,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.core.writer.ThriftWriter import org.apache.carbondata.events.{LookupRelationPostEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo} -import org.apache.carbondata.spark.util.CarbonSparkUtil +import org.apache.carbondata.spark.util.{CarbonSparkUtil, CommonUtil} case class MetaData(var carbonTables: ArrayBuffer[CarbonTable]) { // use to lock the carbonTables @@ -216,12 +216,9 @@ class CarbonFileMetastore extends CarbonMetaStore { "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation")) => val catalogTable = CarbonReflectionUtils.getFieldOfCatalogTable("tableMeta", c).asInstanceOf[CatalogTable] - catalogTable.provider match { - case Some(name) if (name.equals("org.apache.spark.sql.CarbonSource") - || name.equalsIgnoreCase("carbondata")) => name - case _ => - CarbonMetadata.getInstance().removeTable(database, tableIdentifier.table) - throw new NoSuchTableException(database, tableIdentifier.table) + if (!CommonUtil.isCarbonDataSource(catalogTable)) { + CarbonMetadata.getInstance().removeTable(database, tableIdentifier.table) + throw new NoSuchTableException(database, tableIdentifier.table) } val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from( catalogTable.location.toString, database, tableIdentifier.table) @@ -540,11 +537,8 @@ class CarbonFileMetastore extends CarbonMetaStore { "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation")) => val catalogTable = CarbonReflectionUtils.getFieldOfCatalogTable("tableMeta", c).asInstanceOf[CatalogTable] - catalogTable.provider match { - case Some(name) if (name.equals("org.apache.spark.sql.CarbonSource") - || name.equalsIgnoreCase("carbondata")) => name - case _ => - throw new NoSuchTableException(tableIdentifier.database.get, tableIdentifier.table) + if (!CommonUtil.isCarbonDataSource(catalogTable)) { + throw new NoSuchTableException(tableIdentifier.database.get, tableIdentifier.table) } val tableLocation = catalogTable.storage.locationUri match { case tableLoc@Some(uri) =>