Github user gatorsmile commented on a diff in the pull request:
https://github.com/apache/spark/pull/18421#discussion_r128269600
--- Diff:
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ---
@@ -1028,25 +994,115 @@ private[spark] class HiveExternalCatalog(conf:
SparkConf, hadoopConf: Configurat
currentFullPath
}
+ private def statsToProperties(
+ stats: CatalogStatistics,
+ schema: StructType): Map[String, String] = {
+
+ var statsProperties: Map[String, String] =
+ Map(STATISTICS_TOTAL_SIZE -> stats.sizeInBytes.toString())
+ if (stats.rowCount.isDefined) {
+ statsProperties += STATISTICS_NUM_ROWS ->
stats.rowCount.get.toString()
+ }
+
+ val colNameTypeMap: Map[String, DataType] =
+ schema.fields.map(f => (f.name, f.dataType)).toMap
+ stats.colStats.foreach { case (colName, colStat) =>
+ colStat.toMap(colName, colNameTypeMap(colName)).foreach { case (k,
v) =>
+ statsProperties += (columnStatKeyPropName(colName, k) -> v)
+ }
+ }
+
+ statsProperties
+ }
+
+ private def statsFromProperties(
+ properties: Map[String, String],
+ table: String,
+ schema: StructType): Option[CatalogStatistics] = {
+
+ val statsProps = properties.filterKeys(_.startsWith(STATISTICS_PREFIX))
+ if (statsProps.isEmpty) {
+ None
+ } else {
+
+ val colStats = new mutable.HashMap[String, ColumnStat]
+
+ // For each column, recover its column stats. Note that this is
currently a O(n^2) operation,
+ // but given the number of columns it usually not enormous, this is
probably OK as a start.
+ // If we want to map this a linear operation, we'd need a stronger
contract between the
+ // naming convention used for serialization.
+ schema.foreach { field =>
+ if (statsProps.contains(columnStatKeyPropName(field.name,
ColumnStat.KEY_VERSION))) {
+ // If "version" field is defined, then the column stat is
defined.
+ val keyPrefix = columnStatKeyPropName(field.name, "")
+ val colStatMap =
statsProps.filterKeys(_.startsWith(keyPrefix)).map { case (k, v) =>
+ (k.drop(keyPrefix.length), v)
+ }
+
+ ColumnStat.fromMap(table, field, colStatMap).foreach {
+ colStat => colStats += field.name -> colStat
+ }
+ }
+ }
+
+ Some(CatalogStatistics(
+ sizeInBytes = BigInt(statsProps(STATISTICS_TOTAL_SIZE)),
+ rowCount = statsProps.get(STATISTICS_NUM_ROWS).map(BigInt(_)),
+ colStats = colStats.toMap))
+ }
+ }
+
override def alterPartitions(
db: String,
table: String,
newParts: Seq[CatalogTablePartition]): Unit = withClient {
val lowerCasedParts = newParts.map(p => p.copy(spec =
lowerCasePartitionSpec(p.spec)))
+
+ val rawTable = getRawTable(db, table)
+
+ // convert partition statistics to properties so that we can persist
them through hive api
+ val withStatsProps = lowerCasedParts.map(p => {
+ if (p.stats.isDefined) {
+ val statsProperties = statsToProperties(p.stats.get,
rawTable.schema)
+ p.copy(parameters = p.parameters ++ statsProperties)
+ } else {
+ p
+ }
+ })
+
// Note: Before altering table partitions in Hive, you *must* set the
current database
// to the one that contains the table of interest. Otherwise you will
end up with the
// most helpful error message ever: "Unable to alter partition. alter
is not possible."
// See HIVE-2742 for more detail.
client.setCurrentDatabase(db)
- client.alterPartitions(db, table, lowerCasedParts)
+ client.alterPartitions(db, table, withStatsProps)
}
override def getPartition(
db: String,
table: String,
spec: TablePartitionSpec): CatalogTablePartition = withClient {
val part = client.getPartition(db, table, lowerCasePartitionSpec(spec))
- part.copy(spec = restorePartitionSpec(part.spec, getTable(db,
table).partitionColumnNames))
+ restorePartitionMetadata(part, getTable(db, table))
+ }
+
+ private def restorePartitionMetadata(
+ partition: CatalogTablePartition,
+ table: CatalogTable): CatalogTablePartition = {
+ val restoredSpec = restorePartitionSpec(partition.spec,
table.partitionColumnNames)
+
+ // construct Spark's statistics from information in Hive metastore
--- End diff --
Here, we only respect Spark own statistics. Please also clarify it here.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]