Github user gatorsmile commented on a diff in the pull request:
https://github.com/apache/spark/pull/21608#discussion_r200207191
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
---
@@ -47,22 +47,34 @@ object CommandUtils extends Logging {
}
}
- def calculateTotalSize(sessionState: SessionState, catalogTable:
CatalogTable): BigInt = {
+ def calculateTotalSize(spark: SparkSession, catalogTable:
CatalogTable): BigInt = {
+
+ val sessionState = spark.sessionState
+ val serializableConfiguration = new
SerializableConfiguration(sessionState.newHadoopConf())
+ val stagingDir =
sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
+
if (catalogTable.partitionColumnNames.isEmpty) {
- calculateLocationSize(sessionState, catalogTable.identifier,
catalogTable.storage.locationUri)
+ calculateLocationSize(serializableConfiguration,
catalogTable.identifier,
+ catalogTable.storage.locationUri, stagingDir)
} else {
// Calculate table size as a sum of the visible partitions. See
SPARK-21079
val partitions =
sessionState.catalog.listPartitions(catalogTable.identifier)
- partitions.map { p =>
- calculateLocationSize(sessionState, catalogTable.identifier,
p.storage.locationUri)
- }.sum
+ val numParallelism = Math.min(partitions.size,
+ Math.min(spark.sparkContext.defaultParallelism, 10000))
+ spark.sparkContext.parallelize(partitions,
numParallelism).mapPartitions {
--- End diff --
The direction is right, but the implementation needs to be improved by
using `InMemoryFileIndex.bulkListLeafFiles`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]