Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14618#discussion_r74552290
  
    --- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
    @@ -1,494 +0,0 @@
    -/*
    - * Licensed to the Apache Software Foundation (ASF) under one or more
    - * contributor license agreements.  See the NOTICE file distributed with
    - * this work for additional information regarding copyright ownership.
    - * The ASF licenses this file to You under the Apache License, Version 2.0
    - * (the "License"); you may not use this file except in compliance with
    - * the License.  You may obtain a copy of the License at
    - *
    - *    http://www.apache.org/licenses/LICENSE-2.0
    - *
    - * Unless required by applicable law or agreed to in writing, software
    - * distributed under the License is distributed on an "AS IS" BASIS,
    - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    - * See the License for the specific language governing permissions and
    - * limitations under the License.
    - */
    -
    -package org.apache.spark.sql.hive
    -
    -import scala.collection.JavaConverters._
    -
    -import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
    -import org.apache.hadoop.fs.Path
    -
    -import org.apache.spark.internal.Logging
    -import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
    -import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
    -import org.apache.spark.sql.catalyst.catalog._
    -import org.apache.spark.sql.catalyst.expressions._
    -import org.apache.spark.sql.catalyst.plans.logical._
    -import org.apache.spark.sql.catalyst.rules._
    -import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
    -import org.apache.spark.sql.execution.datasources.{Partition => _, _}
    -import 
org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
ParquetOptions}
    -import org.apache.spark.sql.hive.orc.OrcFileFormat
    -import org.apache.spark.sql.types._
    -
    -
    -/**
    - * Legacy catalog for interacting with the Hive metastore.
    - *
    - * This is still used for things like creating data source tables, but in 
the future will be
    - * cleaned up to integrate more nicely with [[HiveExternalCatalog]].
    - */
    -private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) 
extends Logging {
    -  private val sessionState = 
sparkSession.sessionState.asInstanceOf[HiveSessionState]
    -  private val client = 
sparkSession.sharedState.asInstanceOf[HiveSharedState].metadataHive
    -
    -  /** A fully qualified identifier for a table (i.e., database.tableName) 
*/
    -  case class QualifiedTableName(database: String, name: String)
    -
    -  private def getCurrentDatabase: String = 
sessionState.catalog.getCurrentDatabase
    -
    -  def getQualifiedTableName(tableIdent: TableIdentifier): 
QualifiedTableName = {
    -    QualifiedTableName(
    -      tableIdent.database.getOrElse(getCurrentDatabase).toLowerCase,
    -      tableIdent.table.toLowerCase)
    -  }
    -
    -  private def getQualifiedTableName(t: CatalogTable): QualifiedTableName = 
{
    -    QualifiedTableName(
    -      t.identifier.database.getOrElse(getCurrentDatabase).toLowerCase,
    -      t.identifier.table.toLowerCase)
    -  }
    -
    -  /** A cache of Spark SQL data source tables that have been accessed. */
    -  protected[hive] val cachedDataSourceTables: 
LoadingCache[QualifiedTableName, LogicalPlan] = {
    -    val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() {
    -      override def load(in: QualifiedTableName): LogicalPlan = {
    -        logDebug(s"Creating new cached data source for $in")
    -        val table = client.getTable(in.database, in.name)
    -
    -        // TODO: the following code is duplicated with 
FindDataSourceTable.readDataSourceTable
    -
    -        def schemaStringFromParts: Option[String] = {
    -          table.properties.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts 
=>
    -            val parts = (0 until numParts.toInt).map { index =>
    -              val part = 
table.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull
    -              if (part == null) {
    -                throw new AnalysisException(
    -                  "Could not read schema from the metastore because it is 
corrupted " +
    -                    s"(missing part $index of the schema, $numParts parts 
are expected).")
    -              }
    -
    -              part
    -            }
    -            // Stick all parts back to a single schema string.
    -            parts.mkString
    -          }
    -        }
    -
    -        def getColumnNames(colType: String): Seq[String] = {
    -          
table.properties.get(s"$DATASOURCE_SCHEMA.num${colType.capitalize}Cols").map {
    -            numCols => (0 until numCols.toInt).map { index =>
    -              
table.properties.getOrElse(s"$DATASOURCE_SCHEMA_PREFIX${colType}Col.$index",
    -                throw new AnalysisException(
    -                  s"Could not read $colType columns from the metastore 
because it is corrupted " +
    -                    s"(missing part $index of it, $numCols parts are 
expected)."))
    -            }
    -          }.getOrElse(Nil)
    -        }
    -
    -        // Originally, we used spark.sql.sources.schema to store the 
schema of a data source table.
    -        // After SPARK-6024, we removed this flag.
    -        // Although we are not using spark.sql.sources.schema any more, we 
need to still support.
    -        val schemaString = 
table.properties.get(DATASOURCE_SCHEMA).orElse(schemaStringFromParts)
    -
    -        val userSpecifiedSchema =
    -          schemaString.map(s => 
DataType.fromJson(s).asInstanceOf[StructType])
    -
    -        // We only need names at here since userSpecifiedSchema we loaded 
from the metastore
    -        // contains partition columns. We can always get data types of 
partitioning columns
    -        // from userSpecifiedSchema.
    -        val partitionColumns = getColumnNames("part")
    -
    -        val bucketSpec = 
table.properties.get(DATASOURCE_SCHEMA_NUMBUCKETS).map { n =>
    -          BucketSpec(n.toInt, getColumnNames("bucket"), 
getColumnNames("sort"))
    -        }
    -
    -        val options = table.storage.properties
    -        val dataSource =
    -          DataSource(
    -            sparkSession,
    -            userSpecifiedSchema = userSpecifiedSchema,
    -            partitionColumns = partitionColumns,
    -            bucketSpec = bucketSpec,
    -            className = table.properties(DATASOURCE_PROVIDER),
    -            options = options)
    -
    -        LogicalRelation(
    -          dataSource.resolveRelation(checkPathExist = true),
    -          metastoreTableIdentifier = Some(TableIdentifier(in.name, 
Some(in.database))))
    -      }
    -    }
    -
    -    CacheBuilder.newBuilder().maximumSize(1000).build(cacheLoader)
    -  }
    -
    -  def refreshTable(tableIdent: TableIdentifier): Unit = {
    -    // refreshTable does not eagerly reload the cache. It just invalidate 
the cache.
    -    // Next time when we use the table, it will be populated in the cache.
    -    // Since we also cache ParquetRelations converted from Hive Parquet 
tables and
    -    // adding converted ParquetRelations into the cache is not defined in 
the load function
    -    // of the cache (instead, we add the cache entry in 
convertToParquetRelation),
    -    // it is better at here to invalidate the cache to avoid confusing 
waring logs from the
    -    // cache loader (e.g. cannot find data source provider, which is only 
defined for
    -    // data source table.).
    -    cachedDataSourceTables.invalidate(getQualifiedTableName(tableIdent))
    -  }
    -
    -  def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = {
    -    // Code based on: hiveWarehouse.getTablePath(currentDatabase, 
tableName)
    -    val QualifiedTableName(dbName, tblName) = 
getQualifiedTableName(tableIdent)
    -    new Path(new Path(client.getDatabase(dbName).locationUri), 
tblName).toString
    -  }
    -
    -  def lookupRelation(
    -      tableIdent: TableIdentifier,
    -      alias: Option[String]): LogicalPlan = {
    -    val qualifiedTableName = getQualifiedTableName(tableIdent)
    -    val table = client.getTable(qualifiedTableName.database, 
qualifiedTableName.name)
    -
    -    if (table.properties.get(DATASOURCE_PROVIDER).isDefined) {
    -      val dataSourceTable = cachedDataSourceTables(qualifiedTableName)
    -      val qualifiedTable = SubqueryAlias(qualifiedTableName.name, 
dataSourceTable)
    -      // Then, if alias is specified, wrap the table with a Subquery using 
the alias.
    -      // Otherwise, wrap the table with a Subquery using the table name.
    -      alias.map(a => SubqueryAlias(a, 
qualifiedTable)).getOrElse(qualifiedTable)
    -    } else if (table.tableType == CatalogTableType.VIEW) {
    -      val viewText = table.viewText.getOrElse(sys.error("Invalid view 
without text."))
    -      alias match {
    -        case None =>
    -          SubqueryAlias(table.identifier.table,
    -            sparkSession.sessionState.sqlParser.parsePlan(viewText))
    -        case Some(aliasText) =>
    -          SubqueryAlias(aliasText, 
sessionState.sqlParser.parsePlan(viewText))
    -      }
    -    } else {
    -      val qualifiedTable =
    -        MetastoreRelation(
    -          qualifiedTableName.database, qualifiedTableName.name)(table, 
client, sparkSession)
    -      alias.map(a => SubqueryAlias(a, 
qualifiedTable)).getOrElse(qualifiedTable)
    -    }
    -  }
    -
    -  private def getCached(
    -      tableIdentifier: QualifiedTableName,
    -      pathsInMetastore: Seq[String],
    -      metastoreRelation: MetastoreRelation,
    -      schemaInMetastore: StructType,
    -      expectedFileFormat: Class[_ <: FileFormat],
    -      expectedBucketSpec: Option[BucketSpec],
    -      partitionSpecInMetastore: Option[PartitionSpec]): 
Option[LogicalRelation] = {
    -
    -    cachedDataSourceTables.getIfPresent(tableIdentifier) match {
    -      case null => None // Cache miss
    -      case logical @ LogicalRelation(relation: HadoopFsRelation, _, _) =>
    -        val cachedRelationFileFormatClass = relation.fileFormat.getClass
    -
    -        expectedFileFormat match {
    -          case `cachedRelationFileFormatClass` =>
    -            // If we have the same paths, same schema, and same partition 
spec,
    -            // we will use the cached relation.
    -            val useCached =
    -              relation.location.paths.map(_.toString).toSet == 
pathsInMetastore.toSet &&
    -                logical.schema.sameType(schemaInMetastore) &&
    -                relation.bucketSpec == expectedBucketSpec &&
    -                relation.partitionSpec == 
partitionSpecInMetastore.getOrElse {
    -                  PartitionSpec(StructType(Nil), 
Array.empty[PartitionDirectory])
    -                }
    -
    -            if (useCached) {
    -              Some(logical)
    -            } else {
    -              // If the cached relation is not updated, we invalidate it 
right away.
    -              cachedDataSourceTables.invalidate(tableIdentifier)
    -              None
    -            }
    -          case _ =>
    -            logWarning(
    -              
s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} " +
    -                s"should be stored as $expectedFileFormat. However, we are 
getting " +
    -                s"a ${relation.fileFormat} from the metastore cache. This 
cached " +
    -                s"entry will be invalidated.")
    -            cachedDataSourceTables.invalidate(tableIdentifier)
    -            None
    -        }
    -      case other =>
    -        logWarning(
    -          
s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} should be 
stored " +
    -            s"as $expectedFileFormat. However, we are getting a $other 
from the metastore cache. " +
    -            s"This cached entry will be invalidated.")
    -        cachedDataSourceTables.invalidate(tableIdentifier)
    -        None
    -    }
    -  }
    -
    -  private def convertToLogicalRelation(
    -      metastoreRelation: MetastoreRelation,
    -      options: Map[String, String],
    -      defaultSource: FileFormat,
    -      fileFormatClass: Class[_ <: FileFormat],
    -      fileType: String): LogicalRelation = {
    -    val metastoreSchema = 
StructType.fromAttributes(metastoreRelation.output)
    -    val tableIdentifier =
    -      QualifiedTableName(metastoreRelation.databaseName, 
metastoreRelation.tableName)
    -    val bucketSpec = None  // We don't support hive bucketed tables, only 
ones we write out.
    -
    -    val result = if (metastoreRelation.hiveQlTable.isPartitioned) {
    -      val partitionSchema = 
StructType.fromAttributes(metastoreRelation.partitionKeys)
    -      val partitionColumnDataTypes = partitionSchema.map(_.dataType)
    -      // We're converting the entire table into HadoopFsRelation, so 
predicates to Hive metastore
    -      // are empty.
    -      val partitions = metastoreRelation.getHiveQlPartitions().map { p =>
    -        val location = p.getLocation
    -        val values = 
InternalRow.fromSeq(p.getValues.asScala.zip(partitionColumnDataTypes).map {
    -          case (rawValue, dataType) => Cast(Literal(rawValue), 
dataType).eval(null)
    -        })
    -        PartitionDirectory(values, location)
    -      }
    -      val partitionSpec = PartitionSpec(partitionSchema, partitions)
    -      val partitionPaths = partitions.map(_.path.toString)
    -
    -      // By convention (for example, see 
MetaStorePartitionedTableFileCatalog), the definition of a
    -      // partitioned table's paths depends on whether that table has any 
actual partitions.
    -      // Partitioned tables without partitions use the location of the 
table's base path.
    -      // Partitioned tables with partitions use the locations of those 
partitions' data locations,
    -      // _omitting_ the table's base path.
    -      val paths = if (partitionPaths.isEmpty) {
    -        Seq(metastoreRelation.hiveQlTable.getDataLocation.toString)
    -      } else {
    -        partitionPaths
    -      }
    -
    -      val cached = getCached(
    -        tableIdentifier,
    -        paths,
    -        metastoreRelation,
    -        metastoreSchema,
    -        fileFormatClass,
    -        bucketSpec,
    -        Some(partitionSpec))
    -
    -      val hadoopFsRelation = cached.getOrElse {
    -        val fileCatalog = new MetaStorePartitionedTableFileCatalog(
    -          sparkSession,
    -          new Path(metastoreRelation.catalogTable.storage.locationUri.get),
    -          partitionSpec)
    -
    -        val inferredSchema = if (fileType.equals("parquet")) {
    -          val inferredSchema =
    -            defaultSource.inferSchema(sparkSession, options, 
fileCatalog.allFiles())
    -          inferredSchema.map { inferred =>
    -            ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, 
inferred)
    -          }.getOrElse(metastoreSchema)
    -        } else {
    -          defaultSource.inferSchema(sparkSession, options, 
fileCatalog.allFiles()).get
    -        }
    -
    -        val relation = HadoopFsRelation(
    -          sparkSession = sparkSession,
    -          location = fileCatalog,
    -          partitionSchema = partitionSchema,
    -          dataSchema = inferredSchema,
    -          bucketSpec = bucketSpec,
    -          fileFormat = defaultSource,
    -          options = options)
    -
    -        val created = LogicalRelation(
    -          relation,
    -          metastoreTableIdentifier =
    -            Some(TableIdentifier(tableIdentifier.name, 
Some(tableIdentifier.database))))
    -        cachedDataSourceTables.put(tableIdentifier, created)
    -        created
    -      }
    -
    -      hadoopFsRelation
    -    } else {
    -      val paths = 
Seq(metastoreRelation.hiveQlTable.getDataLocation.toString)
    -
    -      val cached = getCached(tableIdentifier,
    -        paths,
    -        metastoreRelation,
    -        metastoreSchema,
    -        fileFormatClass,
    -        bucketSpec,
    -        None)
    -      val logicalRelation = cached.getOrElse {
    -        val created =
    -          LogicalRelation(
    -            DataSource(
    -              sparkSession = sparkSession,
    -              paths = paths,
    -              userSpecifiedSchema = Some(metastoreRelation.schema),
    -              bucketSpec = bucketSpec,
    -              options = options,
    -              className = fileType).resolveRelation(),
    -              metastoreTableIdentifier =
    -                Some(TableIdentifier(tableIdentifier.name, 
Some(tableIdentifier.database))))
    -
    -
    -        cachedDataSourceTables.put(tableIdentifier, created)
    -        created
    -      }
    -
    -      logicalRelation
    -    }
    -    result.copy(expectedOutputAttributes = Some(metastoreRelation.output))
    -  }
    -
    -  /**
    -   * When scanning or writing to non-partitioned Metastore Parquet tables, 
convert them to Parquet
    -   * data source relations for better performance.
    -   */
    -  object ParquetConversions extends Rule[LogicalPlan] {
    -    private def shouldConvertMetastoreParquet(relation: 
MetastoreRelation): Boolean = {
    -      relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") 
&&
    -        sessionState.convertMetastoreParquet
    -    }
    -
    -    private def convertToParquetRelation(relation: MetastoreRelation): 
LogicalRelation = {
    -      val defaultSource = new ParquetFileFormat()
    -      val fileFormatClass = classOf[ParquetFileFormat]
    -
    -      val mergeSchema = 
sessionState.convertMetastoreParquetWithSchemaMerging
    -      val options = Map(ParquetOptions.MERGE_SCHEMA -> 
mergeSchema.toString)
    -
    -      convertToLogicalRelation(relation, options, defaultSource, 
fileFormatClass, "parquet")
    -    }
    -
    -    override def apply(plan: LogicalPlan): LogicalPlan = {
    -      if (!plan.resolved || plan.analyzed) {
    -        return plan
    -      }
    -
    -      plan transformUp {
    -        // Write path
    -        case InsertIntoTable(r: MetastoreRelation, partition, child, 
overwrite, ifNotExists)
    -          // Inserting into partitioned table is not supported in Parquet 
data source (yet).
    -          if !r.hiveQlTable.isPartitioned && 
shouldConvertMetastoreParquet(r) =>
    -          InsertIntoTable(convertToParquetRelation(r), partition, child, 
overwrite, ifNotExists)
    -
    -        // Read path
    -        case relation: MetastoreRelation if 
shouldConvertMetastoreParquet(relation) =>
    -          val parquetRelation = convertToParquetRelation(relation)
    -          SubqueryAlias(relation.tableName, parquetRelation)
    -      }
    -    }
    -  }
    -
    -  /**
    -   * When scanning Metastore ORC tables, convert them to ORC data source 
relations
    -   * for better performance.
    -   */
    -  object OrcConversions extends Rule[LogicalPlan] {
    -    private def shouldConvertMetastoreOrc(relation: MetastoreRelation): 
Boolean = {
    -      relation.tableDesc.getSerdeClassName.toLowerCase.contains("orc") &&
    -        sessionState.convertMetastoreOrc
    -    }
    -
    -    private def convertToOrcRelation(relation: MetastoreRelation): 
LogicalRelation = {
    -      val defaultSource = new OrcFileFormat()
    -      val fileFormatClass = classOf[OrcFileFormat]
    -      val options = Map[String, String]()
    -
    -      convertToLogicalRelation(relation, options, defaultSource, 
fileFormatClass, "orc")
    -    }
    -
    -    override def apply(plan: LogicalPlan): LogicalPlan = {
    -      if (!plan.resolved || plan.analyzed) {
    -        return plan
    -      }
    -
    -      plan transformUp {
    -        // Write path
    -        case InsertIntoTable(r: MetastoreRelation, partition, child, 
overwrite, ifNotExists)
    -          // Inserting into partitioned table is not supported in Orc data 
source (yet).
    -          if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) 
=>
    -          InsertIntoTable(convertToOrcRelation(r), partition, child, 
overwrite, ifNotExists)
    -
    -        // Read path
    -        case relation: MetastoreRelation if 
shouldConvertMetastoreOrc(relation) =>
    -          val orcRelation = convertToOrcRelation(relation)
    -          SubqueryAlias(relation.tableName, orcRelation)
    -      }
    -    }
    -  }
    -
    -  /**
    -   * Creates any tables required for query execution.
    -   * For example, because of a CREATE TABLE X AS statement.
    -   */
    -  object CreateTables extends Rule[LogicalPlan] {
    --- End diff --
    
    Moved to `HiveStrategies.scala`.  See the new 
[location](https://github.com/gatorsmile/spark/blob/9fe620567aa7d61038ef497bf2358e6fff374d38/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala#L94)
 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to