Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14500#discussion_r73780357
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
    @@ -425,6 +430,110 @@ case class AlterTableDropPartitionCommand(
     
     }
     
    +/**
    + * Recover Partitions in ALTER TABLE: recover all the partition in the 
directory of a table and
    + * update the catalog.
    + *
    + * The syntax of this command is:
    + * {{{
    + *   ALTER TABLE table RECOVER PARTITIONS;
    + *   MSCK REPAIR TABLE table;
    + * }}}
    + */
    +case class AlterTableRecoverPartitionsCommand(
    +    tableName: TableIdentifier,
    +    cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends 
RunnableCommand {
    +  override def run(spark: SparkSession): Seq[Row] = {
    +    val catalog = spark.sessionState.catalog
    +    if (!catalog.tableExists(tableName)) {
    +      throw new AnalysisException(s"Table $tableName in $cmd does not 
exist.")
    +    }
    +    val table = catalog.getTableMetadata(tableName)
    +    if (catalog.isTemporaryTable(tableName)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on temporary tables: $tableName")
    +    }
    +    if (DDLUtils.isDatasourceTable(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on datasource tables: $tableName")
    +    }
    +    if (table.tableType != CatalogTableType.EXTERNAL) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on external tables: 
$tableName")
    +    }
    +    if (!DDLUtils.isTablePartitioned(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on partitioned tables: 
$tableName")
    +    }
    +    if (table.storage.locationUri.isEmpty) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on table with location 
provided: $tableName")
    +    }
    +
    +    val root = new Path(table.storage.locationUri.get)
    +    val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
    +    // Dummy jobconf to get to the pathFilter defined in configuration
    +    // It's very expensive to create a 
JobConf(ClassUtil.findContainingJar() is slow)
    +    val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, 
this.getClass)
    +    val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
    +    val partitionSpecsAndLocs = scanPartitions(
    +      spark, fs, pathFilter, root, Map(), 
table.partitionColumnNames.map(_.toLowerCase))
    +    val parts = partitionSpecsAndLocs.map { case (spec, location) =>
    +      // inherit table storage format (possibly except for location)
    +      CatalogTablePartition(spec, table.storage.copy(locationUri = 
Some(location.toUri.toString)))
    +    }
    +    spark.sessionState.catalog.createPartitions(tableName,
    +      parts.toArray[CatalogTablePartition], ignoreIfExists = true)
    +    Seq.empty[Row]
    +  }
    +
    +  @transient private lazy val evalTaskSupport = new 
ForkJoinTaskSupport(new ForkJoinPool(8))
    +
    +  private def scanPartitions(
    +      spark: SparkSession,
    +      fs: FileSystem,
    +      filter: PathFilter,
    +      path: Path,
    +      spec: TablePartitionSpec,
    +      partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
    +    if (partitionNames.length == 0) {
    +      return Seq(spec -> path)
    +    }
    +
    +    val statuses = fs.listStatus(path)
    +    val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", 
"10").toInt
    +    val statusPar: GenSeq[FileStatus] =
    +      if (partitionNames.length > 1 && statuses.length > threshold || 
partitionNames.length > 2) {
    +        val parArray = statuses.par
    +        parArray.tasksupport = evalTaskSupport
    +        parArray
    +      } else {
    +        statuses
    +      }
    +    statusPar.flatMap { st =>
    +      val name = st.getPath.getName
    +      if (st.isDirectory && name.contains("=")) {
    +        val ps = name.split("=", 2)
    +        val columnName = 
PartitioningUtils.unescapePathName(ps(0)).toLowerCase
    +        val value = PartitioningUtils.unescapePathName(ps(1))
    --- End diff --
    
    Do we need to check if the value is valid. E.g., for a partition column "a" 
of IntegerType, "a=abc" is invalid.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to