Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18421#discussion_r132099286
  
    --- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ---
    @@ -1033,25 +998,126 @@ private[spark] class HiveExternalCatalog(conf: 
SparkConf, hadoopConf: Configurat
         currentFullPath
       }
     
    +  private def statsToProperties(
    +      stats: CatalogStatistics,
    +      schema: StructType): Map[String, String] = {
    +
    +    var statsProperties: Map[String, String] =
    +      Map(STATISTICS_TOTAL_SIZE -> stats.sizeInBytes.toString())
    +    if (stats.rowCount.isDefined) {
    +      statsProperties += STATISTICS_NUM_ROWS -> 
stats.rowCount.get.toString()
    +    }
    +
    +    val colNameTypeMap: Map[String, DataType] =
    +      schema.fields.map(f => (f.name, f.dataType)).toMap
    +    stats.colStats.foreach { case (colName, colStat) =>
    +      colStat.toMap(colName, colNameTypeMap(colName)).foreach { case (k, 
v) =>
    +        statsProperties += (columnStatKeyPropName(colName, k) -> v)
    +      }
    +    }
    +
    +    statsProperties
    +  }
    +
    +  private def statsFromProperties(
    +      properties: Map[String, String],
    +      table: String,
    +      schema: StructType): Option[CatalogStatistics] = {
    +
    +    val statsProps = properties.filterKeys(_.startsWith(STATISTICS_PREFIX))
    +    if (statsProps.isEmpty) {
    +      None
    +    } else {
    +
    +      val colStats = new mutable.HashMap[String, ColumnStat]
    +
    +      // For each column, recover its column stats. Note that this is 
currently a O(n^2) operation,
    +      // but given the number of columns it usually not enormous, this is 
probably OK as a start.
    +      // If we want to map this a linear operation, we'd need a stronger 
contract between the
    +      // naming convention used for serialization.
    +      schema.foreach { field =>
    +        if (statsProps.contains(columnStatKeyPropName(field.name, 
ColumnStat.KEY_VERSION))) {
    +          // If "version" field is defined, then the column stat is 
defined.
    +          val keyPrefix = columnStatKeyPropName(field.name, "")
    +          val colStatMap = 
statsProps.filterKeys(_.startsWith(keyPrefix)).map { case (k, v) =>
    +            (k.drop(keyPrefix.length), v)
    +          }
    +
    +          ColumnStat.fromMap(table, field, colStatMap).foreach {
    +            colStat => colStats += field.name -> colStat
    +          }
    +        }
    +      }
    +
    +      Some(CatalogStatistics(
    +        sizeInBytes = BigInt(statsProps(STATISTICS_TOTAL_SIZE)),
    +        rowCount = statsProps.get(STATISTICS_NUM_ROWS).map(BigInt(_)),
    +        colStats = colStats.toMap))
    +    }
    +  }
    +
       override def alterPartitions(
           db: String,
           table: String,
           newParts: Seq[CatalogTablePartition]): Unit = withClient {
         val lowerCasedParts = newParts.map(p => p.copy(spec = 
lowerCasePartitionSpec(p.spec)))
    +
    +    val rawTable = getRawTable(db, table)
    +
    +    // For datasource tables and hive serde tables created by spark 2.1 or 
higher,
    +    // the data schema is stored in the table properties.
    +    val schema = restoreTableMetadata(rawTable).schema
    +
    +    // convert partition statistics to properties so that we can persist 
them through hive api
    +    val withStatsProps = lowerCasedParts.map(p => {
    +      if (p.stats.isDefined) {
    +        val statsProperties = statsToProperties(p.stats.get, schema)
    +        p.copy(parameters = p.parameters ++ statsProperties)
    +      } else {
    +        p
    +      }
    +    })
    +
         // Note: Before altering table partitions in Hive, you *must* set the 
current database
         // to the one that contains the table of interest. Otherwise you will 
end up with the
         // most helpful error message ever: "Unable to alter partition. alter 
is not possible."
         // See HIVE-2742 for more detail.
         client.setCurrentDatabase(db)
    -    client.alterPartitions(db, table, lowerCasedParts)
    +    client.alterPartitions(db, table, withStatsProps)
       }
     
       override def getPartition(
           db: String,
           table: String,
           spec: TablePartitionSpec): CatalogTablePartition = withClient {
         val part = client.getPartition(db, table, lowerCasePartitionSpec(spec))
    -    part.copy(spec = restorePartitionSpec(part.spec, getTable(db, 
table).partitionColumnNames))
    +    restorePartitionMetadata(part, getTable(db, table))
    +  }
    +
    +  /**
    +   * Restores partition metadata from the partition properties.
    +   *
    +   * Reads partition-level statistics from partition properties, puts these
    +   * into [[CatalogTablePartition#stats]] and removes these special entries
    +   * from the partition properties.
    +   */
    +  private def restorePartitionMetadata(
    +      partition: CatalogTablePartition,
    +      table: CatalogTable): CatalogTablePartition = {
    +    val restoredSpec = restorePartitionSpec(partition.spec, 
table.partitionColumnNames)
    +
    +    // Restore Spark's statistics from information in Metastore.
    --- End diff --
    
    Also leave comments here to explain the partition-level stats are 
unavailable prior to 2.3


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to