[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21608 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user Achuth17 commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r208423190 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala --- @@ -49,4 +51,11 @@ object DataSourceUtils { } } } + + // SPARK-15895: Metadata files (e.g. Parquet summary files) and temporary files should not be + // counted as data files, so that they shouldn't participate partition discovery. + private[sql] def isDataPath(path: Path): Boolean = { +val name = path.getName +!((name.startsWith("_") && !name.contains("=")) || name.startsWith(".")) --- End diff -- Sounds good. I have made the changes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r208408858 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala --- @@ -49,4 +51,11 @@ object DataSourceUtils { } } } + + // SPARK-15895: Metadata files (e.g. Parquet summary files) and temporary files should not be + // counted as data files, so that they shouldn't participate partition discovery. + private[sql] def isDataPath(path: Path): Boolean = { +val name = path.getName +!((name.startsWith("_") && !name.contains("=")) || name.startsWith(".")) --- End diff -- Not sure what is your earlier impl. I would prefer to keeping unchanged the original code in `PartitioningAwareFileIndex.scala`. Just add a utility function `isDataPath ` in CommandUtils.scala. Does this sound good to you? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user Achuth17 commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r208295767 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala --- @@ -49,4 +51,11 @@ object DataSourceUtils { } } } + + // SPARK-15895: Metadata files (e.g. Parquet summary files) and temporary files should not be + // counted as data files, so that they shouldn't participate partition discovery. + private[sql] def isDataPath(path: Path): Boolean = { +val name = path.getName +!((name.startsWith("_") && !name.contains("=")) || name.startsWith(".")) --- End diff -- Should I use the earlier implementation with a simple if condition? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r208269963 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala --- @@ -49,4 +51,11 @@ object DataSourceUtils { } } } + + // SPARK-15895: Metadata files (e.g. Parquet summary files) and temporary files should not be + // counted as data files, so that they shouldn't participate partition discovery. + private[sql] def isDataPath(path: Path): Boolean = { +val name = path.getName +!((name.startsWith("_") && !name.contains("=")) || name.startsWith(".")) --- End diff -- Let us do not use the same one with `PartitioningAwareFileIndex.scala`. In this case, the data file is not related to the following condition `name.contains("=")` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r207091637 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1449,6 +1449,15 @@ object SQLConf { .intConf .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION) .createWithDefault(Deflater.DEFAULT_COMPRESSION) + + val PARALLEL_FILE_LISTING_IN_COMMMANDS = +buildConf("spark.sql.parallelFileListingInCommands.enabled") --- End diff -- It's ok to follow xiao's decision. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r207090891 --- Diff: docs/sql-programming-guide.md --- @@ -1872,6 +1872,8 @@ working with timestamps in `pandas_udf`s to get the best performance, see - In version 2.3 and earlier, Spark converts Parquet Hive tables by default but ignores table properties like `TBLPROPERTIES (parquet.compression 'NONE')`. This happens for ORC Hive table properties like `TBLPROPERTIES (orc.compress 'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`, too. Since Spark 2.4, Spark respects Parquet/ORC specific table properties while converting Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id int) STORED AS PARQUET TBLPROPERTIES (parquet.compression 'NONE')` would generate Snappy parquet files during insertion in Spark 2.3, and in Spark 2.4, the result would be uncompressed parquet files. - Since Spark 2.0, Spark converts Parquet Hive tables by default for better performance. Since Spark 2.4, Spark converts ORC Hive tables by default, too. It means Spark uses its own ORC support by default instead of Hive SerDe. As an example, `CREATE TABLE t(id int) STORED AS ORC` would be handled with Hive SerDe in Spark 2.3, and in Spark 2.4, it would be converted into Spark's ORC data source table and ORC vectorization would be applied. To set `false` to `spark.sql.hive.convertMetastoreOrc` restores the previous behavior. - In version 2.3 and earlier, CSV rows are considered as malformed if at least one column value in the row is malformed. CSV parser dropped such rows in the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 2.4, CSV row is considered as malformed only when it contains malformed column values requested from CSV datasource, other values can be ignored. As an example, CSV file contains the "id,name" header and one row "1234". In Spark 2.4, selection of the id column consists of a row with one column value 1234 but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to `false`. + - Since Spark 2.4, File listing for compute statistics is done in parallel by default. This can be disabled by setting `spark.sql.execution.computeStatsListFilesInParallel` to `False`. + - Since Spark 2.4, Metadata files (e.g. Parquet summary files) and temporary files are not counted as data files when calculating table size during Statistics computation. --- End diff -- ah, ok. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user Achuth17 commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r207090805 --- Diff: docs/sql-programming-guide.md --- @@ -1872,6 +1872,8 @@ working with timestamps in `pandas_udf`s to get the best performance, see - In version 2.3 and earlier, Spark converts Parquet Hive tables by default but ignores table properties like `TBLPROPERTIES (parquet.compression 'NONE')`. This happens for ORC Hive table properties like `TBLPROPERTIES (orc.compress 'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`, too. Since Spark 2.4, Spark respects Parquet/ORC specific table properties while converting Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id int) STORED AS PARQUET TBLPROPERTIES (parquet.compression 'NONE')` would generate Snappy parquet files during insertion in Spark 2.3, and in Spark 2.4, the result would be uncompressed parquet files. - Since Spark 2.0, Spark converts Parquet Hive tables by default for better performance. Since Spark 2.4, Spark converts ORC Hive tables by default, too. It means Spark uses its own ORC support by default instead of Hive SerDe. As an example, `CREATE TABLE t(id int) STORED AS ORC` would be handled with Hive SerDe in Spark 2.3, and in Spark 2.4, it would be converted into Spark's ORC data source table and ORC vectorization would be applied. To set `false` to `spark.sql.hive.convertMetastoreOrc` restores the previous behavior. - In version 2.3 and earlier, CSV rows are considered as malformed if at least one column value in the row is malformed. CSV parser dropped such rows in the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 2.4, CSV row is considered as malformed only when it contains malformed column values requested from CSV datasource, other values can be ignored. As an example, CSV file contains the "id,name" header and one row "1234". In Spark 2.4, selection of the id column consists of a row with one column value 1234 but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to `false`. + - Since Spark 2.4, File listing for compute statistics is done in parallel by default. This can be disabled by setting `spark.sql.execution.computeStatsListFilesInParallel` to `False`. --- End diff -- Same as below. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user Achuth17 commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r207090543 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1449,6 +1449,15 @@ object SQLConf { .intConf .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION) .createWithDefault(Deflater.DEFAULT_COMPRESSION) + + val PARALLEL_FILE_LISTING_IN_COMMMANDS = +buildConf("spark.sql.parallelFileListingInCommands.enabled") --- End diff -- I can make this change.. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user Achuth17 commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r207090467 --- Diff: docs/sql-programming-guide.md --- @@ -1872,6 +1872,8 @@ working with timestamps in `pandas_udf`s to get the best performance, see - In version 2.3 and earlier, Spark converts Parquet Hive tables by default but ignores table properties like `TBLPROPERTIES (parquet.compression 'NONE')`. This happens for ORC Hive table properties like `TBLPROPERTIES (orc.compress 'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`, too. Since Spark 2.4, Spark respects Parquet/ORC specific table properties while converting Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id int) STORED AS PARQUET TBLPROPERTIES (parquet.compression 'NONE')` would generate Snappy parquet files during insertion in Spark 2.3, and in Spark 2.4, the result would be uncompressed parquet files. - Since Spark 2.0, Spark converts Parquet Hive tables by default for better performance. Since Spark 2.4, Spark converts ORC Hive tables by default, too. It means Spark uses its own ORC support by default instead of Hive SerDe. As an example, `CREATE TABLE t(id int) STORED AS ORC` would be handled with Hive SerDe in Spark 2.3, and in Spark 2.4, it would be converted into Spark's ORC data source table and ORC vectorization would be applied. To set `false` to `spark.sql.hive.convertMetastoreOrc` restores the previous behavior. - In version 2.3 and earlier, CSV rows are considered as malformed if at least one column value in the row is malformed. CSV parser dropped such rows in the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 2.4, CSV row is considered as malformed only when it contains malformed column values requested from CSV datasource, other values can be ignored. As an example, CSV file contains the "id,name" header and one row "1234". In Spark 2.4, selection of the id column consists of a row with one column value 1234 but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to `false`. + - Since Spark 2.4, File listing for compute statistics is done in parallel by default. This can be disabled by setting `spark.sql.execution.computeStatsListFilesInParallel` to `False`. + - Since Spark 2.4, Metadata files (e.g. Parquet summary files) and temporary files are not counted as data files when calculating table size during Statistics computation. --- End diff -- @maropu The calculateTotalSize flow is invoked in other places too. For eg, it is used in updateTableStats method which is called from a few places. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r207074028 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1449,6 +1449,15 @@ object SQLConf { .intConf .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION) .createWithDefault(Deflater.DEFAULT_COMPRESSION) + + val PARALLEL_FILE_LISTING_IN_COMMMANDS = +buildConf("spark.sql.parallelFileListingInCommands.enabled") --- End diff -- Currently, in the master, this listing happens only in the ANALYZE SQL command, so this name fit the case now. But, in a future, if we probably add a new interface for the listing (e.g., `SparkSession.analyzeTable`, `SparkSession.analyzeColumn`, ...), this name a little confuses users? So, how about `spark.sql.parallelFileListingInStatsComputation.enabledl`? @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r207071540 --- Diff: docs/sql-programming-guide.md --- @@ -1872,6 +1872,8 @@ working with timestamps in `pandas_udf`s to get the best performance, see - In version 2.3 and earlier, Spark converts Parquet Hive tables by default but ignores table properties like `TBLPROPERTIES (parquet.compression 'NONE')`. This happens for ORC Hive table properties like `TBLPROPERTIES (orc.compress 'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`, too. Since Spark 2.4, Spark respects Parquet/ORC specific table properties while converting Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id int) STORED AS PARQUET TBLPROPERTIES (parquet.compression 'NONE')` would generate Snappy parquet files during insertion in Spark 2.3, and in Spark 2.4, the result would be uncompressed parquet files. - Since Spark 2.0, Spark converts Parquet Hive tables by default for better performance. Since Spark 2.4, Spark converts ORC Hive tables by default, too. It means Spark uses its own ORC support by default instead of Hive SerDe. As an example, `CREATE TABLE t(id int) STORED AS ORC` would be handled with Hive SerDe in Spark 2.3, and in Spark 2.4, it would be converted into Spark's ORC data source table and ORC vectorization would be applied. To set `false` to `spark.sql.hive.convertMetastoreOrc` restores the previous behavior. - In version 2.3 and earlier, CSV rows are considered as malformed if at least one column value in the row is malformed. CSV parser dropped such rows in the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 2.4, CSV row is considered as malformed only when it contains malformed column values requested from CSV datasource, other values can be ignored. As an example, CSV file contains the "id,name" header and one row "1234". In Spark 2.4, selection of the id column consists of a row with one column value 1234 but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to `false`. + - Since Spark 2.4, File listing for compute statistics is done in parallel by default. This can be disabled by setting `spark.sql.execution.computeStatsListFilesInParallel` to `False`. + - Since Spark 2.4, Metadata files (e.g. Parquet summary files) and temporary files are not counted as data files when calculating table size during Statistics computation. --- End diff -- nit: `... during Statistics computation in the ANALYZE command.`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r207071484 --- Diff: docs/sql-programming-guide.md --- @@ -1872,6 +1872,8 @@ working with timestamps in `pandas_udf`s to get the best performance, see - In version 2.3 and earlier, Spark converts Parquet Hive tables by default but ignores table properties like `TBLPROPERTIES (parquet.compression 'NONE')`. This happens for ORC Hive table properties like `TBLPROPERTIES (orc.compress 'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`, too. Since Spark 2.4, Spark respects Parquet/ORC specific table properties while converting Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id int) STORED AS PARQUET TBLPROPERTIES (parquet.compression 'NONE')` would generate Snappy parquet files during insertion in Spark 2.3, and in Spark 2.4, the result would be uncompressed parquet files. - Since Spark 2.0, Spark converts Parquet Hive tables by default for better performance. Since Spark 2.4, Spark converts ORC Hive tables by default, too. It means Spark uses its own ORC support by default instead of Hive SerDe. As an example, `CREATE TABLE t(id int) STORED AS ORC` would be handled with Hive SerDe in Spark 2.3, and in Spark 2.4, it would be converted into Spark's ORC data source table and ORC vectorization would be applied. To set `false` to `spark.sql.hive.convertMetastoreOrc` restores the previous behavior. - In version 2.3 and earlier, CSV rows are considered as malformed if at least one column value in the row is malformed. CSV parser dropped such rows in the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 2.4, CSV row is considered as malformed only when it contains malformed column values requested from CSV datasource, other values can be ignored. As an example, CSV file contains the "id,name" header and one row "1234". In Spark 2.4, selection of the id column consists of a row with one column value 1234 but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to `false`. + - Since Spark 2.4, File listing for compute statistics is done in parallel by default. This can be disabled by setting `spark.sql.execution.computeStatsListFilesInParallel` to `False`. --- End diff -- nit: `File listing for compute statistics in the ANALYZE command...`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user Achuth17 commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r207035320 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala --- @@ -78,7 +93,8 @@ object CommandUtils extends Logging { val size = if (fileStatus.isDirectory) { fs.listStatus(path) .map { status => -if (!status.getPath.getName.startsWith(stagingDir)) { +if (!status.getPath.getName.startsWith(stagingDir) && + DataSourceUtils.isDataPath(path)) { --- End diff -- Added a line to migration doc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user Achuth17 commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r207035227 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1449,6 +1449,13 @@ object SQLConf { .intConf .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION) .createWithDefault(Deflater.DEFAULT_COMPRESSION) + + val COMPUTE_STATS_LIST_FILES_IN_PARALLEL = +buildConf("spark.sql.execution.computeStatsListFilesInParallel") + .internal() + .doc("If True, File listing for compute statistics is done in parallel.") --- End diff -- Thanks! I have made this change, --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user Achuth17 commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r207035140 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1449,6 +1449,13 @@ object SQLConf { .intConf .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION) .createWithDefault(Deflater.DEFAULT_COMPRESSION) + + val COMPUTE_STATS_LIST_FILES_IN_PARALLEL = +buildConf("spark.sql.execution.computeStatsListFilesInParallel") --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r207020824 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala --- @@ -78,7 +93,8 @@ object CommandUtils extends Logging { val size = if (fileStatus.isDirectory) { fs.listStatus(path) .map { status => -if (!status.getPath.getName.startsWith(stagingDir)) { +if (!status.getPath.getName.startsWith(stagingDir) && + DataSourceUtils.isDataPath(path)) { --- End diff -- This is also a behavior change. Could you document it too? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r207017248 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1449,6 +1449,13 @@ object SQLConf { .intConf .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION) .createWithDefault(Deflater.DEFAULT_COMPRESSION) + + val COMPUTE_STATS_LIST_FILES_IN_PARALLEL = +buildConf("spark.sql.execution.computeStatsListFilesInParallel") + .internal() + .doc("If True, File listing for compute statistics is done in parallel.") --- End diff -- When true, SQL commands use parallel file listing, as opposed to single thread listing. This usually speeds up commands that need to list many directories. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r207017122 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1449,6 +1449,13 @@ object SQLConf { .intConf .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION) .createWithDefault(Deflater.DEFAULT_COMPRESSION) + + val COMPUTE_STATS_LIST_FILES_IN_PARALLEL = +buildConf("spark.sql.execution.computeStatsListFilesInParallel") --- End diff -- How about `spark.sql.parallelFileListingInCommands.enabled`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user Achuth17 commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r205916878 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala --- @@ -47,15 +48,23 @@ object CommandUtils extends Logging { } } - def calculateTotalSize(sessionState: SessionState, catalogTable: CatalogTable): BigInt = { + def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = { +val sessionState = spark.sessionState if (catalogTable.partitionColumnNames.isEmpty) { calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri) } else { // Calculate table size as a sum of the visible partitions. See SPARK-21079 val partitions = sessionState.catalog.listPartitions(catalogTable.identifier) - partitions.map { p => -calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri) - }.sum + val paths = partitions.map(x => new Path(x.storage.locationUri.get)) + val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging") + val pathFilter = new PathFilter with Serializable { +override def accept(path: Path): Boolean = { + DataSourceUtils.isDataPath(path) && !path.getName.startsWith(stagingDir) --- End diff -- This is a private method in partitioningAwareFileIndex, Should I add this change to the migration guide? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user Achuth17 commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r205916762 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala --- @@ -47,15 +48,23 @@ object CommandUtils extends Logging { } } - def calculateTotalSize(sessionState: SessionState, catalogTable: CatalogTable): BigInt = { + def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = { +val sessionState = spark.sessionState if (catalogTable.partitionColumnNames.isEmpty) { calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri) --- End diff -- I have made the changes for this in calculateLocationSize. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user Achuth17 commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r205546260 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala --- @@ -47,15 +48,23 @@ object CommandUtils extends Logging { } } - def calculateTotalSize(sessionState: SessionState, catalogTable: CatalogTable): BigInt = { + def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = { +val sessionState = spark.sessionState if (catalogTable.partitionColumnNames.isEmpty) { calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri) --- End diff -- Got it, maybe we can add the filter inside calculateLocationSize. Let me know if that's okay, I can make the changes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r205545427 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala --- @@ -47,15 +48,23 @@ object CommandUtils extends Logging { } } - def calculateTotalSize(sessionState: SessionState, catalogTable: CatalogTable): BigInt = { + def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = { +val sessionState = spark.sessionState if (catalogTable.partitionColumnNames.isEmpty) { calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri) --- End diff -- Let us make them consistent. only count the data files. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user Achuth17 commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r205545098 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala --- @@ -47,15 +48,23 @@ object CommandUtils extends Logging { } } - def calculateTotalSize(sessionState: SessionState, catalogTable: CatalogTable): BigInt = { + def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = { +val sessionState = spark.sessionState if (catalogTable.partitionColumnNames.isEmpty) { calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri) --- End diff -- If not, we have to make changes in all those places too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user Achuth17 commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r205544840 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala --- @@ -47,15 +48,23 @@ object CommandUtils extends Logging { } } - def calculateTotalSize(sessionState: SessionState, catalogTable: CatalogTable): BigInt = { + def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = { +val sessionState = spark.sessionState if (catalogTable.partitionColumnNames.isEmpty) { calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri) --- End diff -- This filtering of temp files is not done in other places where calculateLocationSize is called directly (eg AlterTableAddPartition). Should we skip checking for the temp files since they don't make a lot of difference to the total size? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r205542099 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala --- @@ -47,15 +48,23 @@ object CommandUtils extends Logging { } } - def calculateTotalSize(sessionState: SessionState, catalogTable: CatalogTable): BigInt = { + def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = { +val sessionState = spark.sessionState if (catalogTable.partitionColumnNames.isEmpty) { calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri) --- End diff -- It sounds like we are not following the same file filtering for non-partition tables. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r205539125 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala --- @@ -47,15 +48,23 @@ object CommandUtils extends Logging { } } - def calculateTotalSize(sessionState: SessionState, catalogTable: CatalogTable): BigInt = { + def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = { +val sessionState = spark.sessionState if (catalogTable.partitionColumnNames.isEmpty) { calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri) } else { // Calculate table size as a sum of the visible partitions. See SPARK-21079 val partitions = sessionState.catalog.listPartitions(catalogTable.identifier) - partitions.map { p => -calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri) - }.sum + val paths = partitions.map(x => new Path(x.storage.locationUri.get)) + val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging") + val pathFilter = new PathFilter with Serializable { +override def accept(path: Path): Boolean = { + DataSourceUtils.isDataPath(path) && !path.getName.startsWith(stagingDir) --- End diff -- This is a behavior change. Could you document the changes in the migration guide? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user Achuth17 commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r204823567 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala --- @@ -47,15 +48,26 @@ object CommandUtils extends Logging { } } - def calculateTotalSize(sessionState: SessionState, catalogTable: CatalogTable): BigInt = { + def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = { +val sessionState = spark.sessionState if (catalogTable.partitionColumnNames.isEmpty) { calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri) } else { // Calculate table size as a sum of the visible partitions. See SPARK-21079 val partitions = sessionState.catalog.listPartitions(catalogTable.identifier) - partitions.map { p => -calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri) - }.sum + val paths = partitions.map(x => new Path(x.storage.locationUri.get)) + val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging") + val pathFilter = new PathFilter with Serializable { +override def accept(path: Path): Boolean = { + val fileName = path.getName + (// Ignore metadata files starting with "_" +DataSourceUtils.isDataPath(path) && !fileName.startsWith(stagingDir)) --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user Achuth17 commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r204823533 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala --- @@ -47,15 +48,26 @@ object CommandUtils extends Logging { } } - def calculateTotalSize(sessionState: SessionState, catalogTable: CatalogTable): BigInt = { + def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = { +val sessionState = spark.sessionState if (catalogTable.partitionColumnNames.isEmpty) { calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri) } else { // Calculate table size as a sum of the visible partitions. See SPARK-21079 val partitions = sessionState.catalog.listPartitions(catalogTable.identifier) - partitions.map { p => -calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri) - }.sum + val paths = partitions.map(x => new Path(x.storage.locationUri.get)) + val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging") + val pathFilter = new PathFilter with Serializable { +override def accept(path: Path): Boolean = { + val fileName = path.getName + (// Ignore metadata files starting with "_" +DataSourceUtils.isDataPath(path) && !fileName.startsWith(stagingDir)) +} + } + val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles(paths, +sessionState.newHadoopConf(), pathFilter, +spark).flatMap(_._2) + fileStatusSeq.map(_.getLen).sum --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user Achuth17 commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r204823422 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala --- @@ -148,6 +148,19 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } } + test("verify table size calculation is accurate") { --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user Achuth17 commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r204823485 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala --- @@ -59,14 +59,15 @@ abstract class PartitioningAwareFileIndex( override def listFiles( partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) { - PartitionDirectory(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil + PartitionDirectory(InternalRow.empty, +allFiles().filter(f => DataSourceUtils.isDataPath(f.getPath))) :: Nil --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user Achuth17 commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r204823137 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala --- @@ -148,6 +149,25 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } } + test("verify table size calculation is accurate") { +val checkSizeTable = "checkSizeTable" --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r204656360 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala --- @@ -148,6 +149,25 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } } + test("verify table size calculation is accurate") { +val checkSizeTable = "checkSizeTable" --- End diff -- super nit: can you move this variable inside `withSQLConf`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r204655895 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala --- @@ -148,6 +148,19 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } } + test("verify table size calculation is accurate") { --- End diff -- We should make the title more obvious and how about `SPARK-24626 parallelize location size calculation in Analyze Table command`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r204654664 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala --- @@ -59,14 +59,15 @@ abstract class PartitioningAwareFileIndex( override def listFiles( partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) { - PartitionDirectory(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil + PartitionDirectory(InternalRow.empty, +allFiles().filter(f => DataSourceUtils.isDataPath(f.getPath))) :: Nil --- End diff -- super nit: ``` val files = allFiles().filter(f => DataSourceUtils.isDataPath(f.getPath)) PartitionDirectory(InternalRow.empty, files) :: Nil ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r204654083 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala --- @@ -47,15 +48,26 @@ object CommandUtils extends Logging { } } - def calculateTotalSize(sessionState: SessionState, catalogTable: CatalogTable): BigInt = { + def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = { +val sessionState = spark.sessionState if (catalogTable.partitionColumnNames.isEmpty) { calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri) } else { // Calculate table size as a sum of the visible partitions. See SPARK-21079 val partitions = sessionState.catalog.listPartitions(catalogTable.identifier) - partitions.map { p => -calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri) - }.sum + val paths = partitions.map(x => new Path(x.storage.locationUri.get)) + val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging") + val pathFilter = new PathFilter with Serializable { +override def accept(path: Path): Boolean = { + val fileName = path.getName + (// Ignore metadata files starting with "_" +DataSourceUtils.isDataPath(path) && !fileName.startsWith(stagingDir)) +} + } + val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles(paths, +sessionState.newHadoopConf(), pathFilter, +spark).flatMap(_._2) + fileStatusSeq.map(_.getLen).sum --- End diff -- How about this? ``` val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles( paths, sessionState.newHadoopConf(), SerializablePathFilter(stagingDir), spark) fileStatusSeq.flatMap(_._2.map(_.getLen)).sum ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r204652437 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala --- @@ -47,15 +48,26 @@ object CommandUtils extends Logging { } } - def calculateTotalSize(sessionState: SessionState, catalogTable: CatalogTable): BigInt = { + def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = { +val sessionState = spark.sessionState if (catalogTable.partitionColumnNames.isEmpty) { calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri) } else { // Calculate table size as a sum of the visible partitions. See SPARK-21079 val partitions = sessionState.catalog.listPartitions(catalogTable.identifier) - partitions.map { p => -calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri) - }.sum + val paths = partitions.map(x => new Path(x.storage.locationUri.get)) + val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging") + val pathFilter = new PathFilter with Serializable { +override def accept(path: Path): Boolean = { + val fileName = path.getName + (// Ignore metadata files starting with "_" +DataSourceUtils.isDataPath(path) && !fileName.startsWith(stagingDir)) --- End diff -- How about this? ``` override def accept(path: Path): Boolean = { DataSourceUtils.isDataPath(path) && !path.getName.startsWith(stagingDir)) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user Achuth17 commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r204618663 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala --- @@ -148,6 +148,19 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } } + test("verify table size calculation is accurate") { --- End diff -- @maropu, Does this test look okay? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user Achuth17 commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r204618589 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala --- @@ -55,4 +57,11 @@ private[sql] object DataSourceV2Utils extends Logging { case _ => Map.empty } + + // SPARK-15895: Metadata files (e.g. Parquet summary files) and temporary files should not be --- End diff -- Moved it, I couldn't see that file before pulling the upstream master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r204609271 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala --- @@ -55,4 +57,11 @@ private[sql] object DataSourceV2Utils extends Logging { case _ => Map.empty } + + // SPARK-15895: Metadata files (e.g. Parquet summary files) and temporary files should not be --- End diff -- Why do you use `DataSourceV2Utils` instead of `DataSourceUtils`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user Achuth17 commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r204200662 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala --- @@ -47,15 +48,27 @@ object CommandUtils extends Logging { } } - def calculateTotalSize(sessionState: SessionState, catalogTable: CatalogTable): BigInt = { + def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = { +val sessionState = spark.sessionState if (catalogTable.partitionColumnNames.isEmpty) { calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri) } else { // Calculate table size as a sum of the visible partitions. See SPARK-21079 val partitions = sessionState.catalog.listPartitions(catalogTable.identifier) - partitions.map { p => -calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri) - }.sum + val paths = partitions.map(x => new Path(x.storage.locationUri.get)) + val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging") + val pathFilter = new PathFilter with Serializable { +override def accept(path: Path): Boolean = { + val fileName = path.getName + (!fileName.startsWith(stagingDir) && +// Ignore metadata files starting with "_" +!fileName.startsWith("_")) --- End diff -- Done. Also, we are not doing this check when `calculateLocationSize` is called directly. I will file a different PR for this as this is not related to AnalyzeTableCommand. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r204196291 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala --- @@ -47,15 +48,27 @@ object CommandUtils extends Logging { } } - def calculateTotalSize(sessionState: SessionState, catalogTable: CatalogTable): BigInt = { + def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = { +val sessionState = spark.sessionState if (catalogTable.partitionColumnNames.isEmpty) { calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri) } else { // Calculate table size as a sum of the visible partitions. See SPARK-21079 val partitions = sessionState.catalog.listPartitions(catalogTable.identifier) - partitions.map { p => -calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri) - }.sum + val paths = partitions.map(x => new Path(x.storage.locationUri.get)) + val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging") + val pathFilter = new PathFilter with Serializable { +override def accept(path: Path): Boolean = { + val fileName = path.getName + (!fileName.startsWith(stagingDir) && +// Ignore metadata files starting with "_" +!fileName.startsWith("_")) --- End diff -- How about `DataSourceUtils`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user Achuth17 commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r204121451 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala --- @@ -47,15 +48,27 @@ object CommandUtils extends Logging { } } - def calculateTotalSize(sessionState: SessionState, catalogTable: CatalogTable): BigInt = { + def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = { +val sessionState = spark.sessionState if (catalogTable.partitionColumnNames.isEmpty) { calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri) } else { // Calculate table size as a sum of the visible partitions. See SPARK-21079 val partitions = sessionState.catalog.listPartitions(catalogTable.identifier) - partitions.map { p => -calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri) - }.sum + val paths = partitions.map(x => new Path(x.storage.locationUri.get)) + val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging") + val pathFilter = new PathFilter with Serializable { +override def accept(path: Path): Boolean = { + val fileName = path.getName + (!fileName.startsWith(stagingDir) && +// Ignore metadata files starting with "_" +!fileName.startsWith("_")) --- End diff -- It is a generic method and only used within `PartitioningAwareFileIndex`, so we can move it to `CommandUtils`. Does that sound okay? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r203953577 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala --- @@ -47,15 +48,27 @@ object CommandUtils extends Logging { } } - def calculateTotalSize(sessionState: SessionState, catalogTable: CatalogTable): BigInt = { + def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = { +val sessionState = spark.sessionState if (catalogTable.partitionColumnNames.isEmpty) { calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri) } else { // Calculate table size as a sum of the visible partitions. See SPARK-21079 val partitions = sessionState.catalog.listPartitions(catalogTable.identifier) - partitions.map { p => -calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri) - }.sum + val paths = partitions.map(x => new Path(x.storage.locationUri.get)) + val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging") + val pathFilter = new PathFilter with Serializable { +override def accept(path: Path): Boolean = { + val fileName = path.getName + (!fileName.startsWith(stagingDir) && +// Ignore metadata files starting with "_" +!fileName.startsWith("_")) --- End diff -- How about just moving the function into somewhere that can be accessed from here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user Achuth17 commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r203613041 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala --- @@ -148,6 +148,19 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } } + test("verify table size calculation is accurate") { --- End diff -- @maropu, I have fixed the test to verify if the calculation is being done in parallel. Can you review the change? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user Achuth17 commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r203456566 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala --- @@ -47,15 +48,27 @@ object CommandUtils extends Logging { } } - def calculateTotalSize(sessionState: SessionState, catalogTable: CatalogTable): BigInt = { + def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = { +val sessionState = spark.sessionState if (catalogTable.partitionColumnNames.isEmpty) { calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri) } else { // Calculate table size as a sum of the visible partitions. See SPARK-21079 val partitions = sessionState.catalog.listPartitions(catalogTable.identifier) - partitions.map { p => -calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri) - }.sum + val paths = partitions.map(x => new Path(x.storage.locationUri.get)) + val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging") + val pathFilter = new PathFilter with Serializable { +override def accept(path: Path): Boolean = { + val fileName = path.getName + (!fileName.startsWith(stagingDir) && +// Ignore metadata files starting with "_" +!fileName.startsWith("_")) --- End diff -- So we are not creating an instance of `InMemoryFileIndex` class so `isDataPath` is not accessible here. I think it would be simpler to add these checks here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r203265365 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala --- @@ -47,15 +48,27 @@ object CommandUtils extends Logging { } } - def calculateTotalSize(sessionState: SessionState, catalogTable: CatalogTable): BigInt = { + def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = { +val sessionState = spark.sessionState if (catalogTable.partitionColumnNames.isEmpty) { calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri) } else { // Calculate table size as a sum of the visible partitions. See SPARK-21079 val partitions = sessionState.catalog.listPartitions(catalogTable.identifier) - partitions.map { p => -calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri) - }.sum + val paths = partitions.map(x => new Path(x.storage.locationUri.get)) + val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging") + val pathFilter = new PathFilter with Serializable { +override def accept(path: Path): Boolean = { + val fileName = path.getName + (!fileName.startsWith(stagingDir) && +// Ignore metadata files starting with "_" +!fileName.startsWith("_")) --- End diff -- We already have the function to check if it is a data file, or not. Can we reuse this? https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala#L232 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r203264804 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala --- @@ -148,6 +148,19 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } } + test("verify table size calculation is accurate") { --- End diff -- I originally meant you'd be better to test if the calculation done in parallel like https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala#L99 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user Achuth17 commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r201820832 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala --- @@ -162,7 +162,7 @@ object InMemoryFileIndex extends Logging { * * @return for each input path, the set of discovered files for the path */ - private def bulkListLeafFiles( + def bulkListLeafFiles( --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user Achuth17 commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r201820630 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala --- @@ -47,15 +48,29 @@ object CommandUtils extends Logging { } } - def calculateTotalSize(sessionState: SessionState, catalogTable: CatalogTable): BigInt = { + private case class SerializablePathFilter(stagingDir: String) +extends PathFilter with Serializable { +override def accept(path: Path): Boolean = { + val fileName = path.getName + (!fileName.startsWith(stagingDir) && +// Ignore metadata files starting with "_" +!fileName.startsWith("_")) +} + } + + def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = { +val sessionState = spark.sessionState if (catalogTable.partitionColumnNames.isEmpty) { calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri) } else { // Calculate table size as a sum of the visible partitions. See SPARK-21079 val partitions = sessionState.catalog.listPartitions(catalogTable.identifier) - partitions.map { p => -calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri) - }.sum + val paths = partitions.map(x => new Path(x.storage.locationUri.get)) + val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging") --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r201545004 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala --- @@ -162,7 +162,7 @@ object InMemoryFileIndex extends Logging { * * @return for each input path, the set of discovered files for the path */ - private def bulkListLeafFiles( + def bulkListLeafFiles( --- End diff -- Remove the unnecessary indent. Also, `private[sql]`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r201544879 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala --- @@ -47,15 +48,29 @@ object CommandUtils extends Logging { } } - def calculateTotalSize(sessionState: SessionState, catalogTable: CatalogTable): BigInt = { + private case class SerializablePathFilter(stagingDir: String) +extends PathFilter with Serializable { +override def accept(path: Path): Boolean = { + val fileName = path.getName + (!fileName.startsWith(stagingDir) && +// Ignore metadata files starting with "_" +!fileName.startsWith("_")) +} + } + + def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = { +val sessionState = spark.sessionState if (catalogTable.partitionColumnNames.isEmpty) { calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri) } else { // Calculate table size as a sum of the visible partitions. See SPARK-21079 val partitions = sessionState.catalog.listPartitions(catalogTable.identifier) - partitions.map { p => -calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri) - }.sum + val paths = partitions.map(x => new Path(x.storage.locationUri.get)) + val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging") --- End diff -- Since `SerializablePathFilter` is only used here, how about defining it as an anonymous class? ``` val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging") val pathFilter = new PathFilter with Serializable { override def accept(path: Path): Boolean = ... } val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles(paths, sessionState.newHadoopConf(), pathFilter, spark).flatMap(_._2) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user Achuth17 commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r200824025 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala --- @@ -47,15 +48,26 @@ object CommandUtils extends Logging { } } - def calculateTotalSize(sessionState: SessionState, catalogTable: CatalogTable): BigInt = { +def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = { + +val sessionState = spark.sessionState +val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging") + if (catalogTable.partitionColumnNames.isEmpty) { - calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri) + calculateLocationSize(sessionState, catalogTable.identifier, + catalogTable.storage.locationUri) } else { // Calculate table size as a sum of the visible partitions. See SPARK-21079 val partitions = sessionState.catalog.listPartitions(catalogTable.identifier) - partitions.map { p => -calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri) - }.sum + val paths = partitions.map(x => new Path(x.storage.locationUri.get.getPath)) + val pathFilter = new PathFilter { +override def accept(path: Path): Boolean = { + !path.getName.startsWith(stagingDir) +} + } + val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles(paths, +sessionState.newHadoopConf(), pathFilter, spark).flatMap(x => x._2) --- End diff -- Thank you, I have made the changes. Can you review this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r200704192 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala --- @@ -47,15 +48,26 @@ object CommandUtils extends Logging { } } - def calculateTotalSize(sessionState: SessionState, catalogTable: CatalogTable): BigInt = { +def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = { + +val sessionState = spark.sessionState +val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging") + if (catalogTable.partitionColumnNames.isEmpty) { - calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri) + calculateLocationSize(sessionState, catalogTable.identifier, + catalogTable.storage.locationUri) } else { // Calculate table size as a sum of the visible partitions. See SPARK-21079 val partitions = sessionState.catalog.listPartitions(catalogTable.identifier) - partitions.map { p => -calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri) - }.sum + val paths = partitions.map(x => new Path(x.storage.locationUri.get.getPath)) + val pathFilter = new PathFilter { +override def accept(path: Path): Boolean = { + !path.getName.startsWith(stagingDir) +} + } + val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles(paths, +sessionState.newHadoopConf(), pathFilter, spark).flatMap(x => x._2) --- End diff -- ```Scala class PathFilterIgnoreNonData(stagingDir: String) extends PathFilter with Serializable { override def accept(path: Path): Boolean = { val fileName = path.getName (!fileName.startsWith(stagingDir) && // Ignore metadata files starting with "_" (for example, files created by // DirectoryAtomicCommitProtocol) when computing the location size !fileName.startsWith("_")) } } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user Achuth17 commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r200666542 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala --- @@ -47,15 +48,26 @@ object CommandUtils extends Logging { } } - def calculateTotalSize(sessionState: SessionState, catalogTable: CatalogTable): BigInt = { +def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = { + +val sessionState = spark.sessionState +val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging") + if (catalogTable.partitionColumnNames.isEmpty) { - calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri) + calculateLocationSize(sessionState, catalogTable.identifier, + catalogTable.storage.locationUri) } else { // Calculate table size as a sum of the visible partitions. See SPARK-21079 val partitions = sessionState.catalog.listPartitions(catalogTable.identifier) - partitions.map { p => -calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri) - }.sum + val paths = partitions.map(x => new Path(x.storage.locationUri.get.getPath)) + val pathFilter = new PathFilter { +override def accept(path: Path): Boolean = { + !path.getName.startsWith(stagingDir) +} + } + val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles(paths, +sessionState.newHadoopConf(), pathFilter, spark).flatMap(x => x._2) --- End diff -- The above approach might not work too. In the earlier implementation there was a check from recursively listing files from certain directories (`stagingDir`) and having a pathFilter might not be the right approach. So I wanted to introduce a list of strings called `filterDir` as a new parameter to `bulkListLeafFiles` which can be used to check if a particular directory can be recursed further. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user Achuth17 commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r200544127 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala --- @@ -47,15 +48,26 @@ object CommandUtils extends Logging { } } - def calculateTotalSize(sessionState: SessionState, catalogTable: CatalogTable): BigInt = { +def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = { + +val sessionState = spark.sessionState +val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging") + if (catalogTable.partitionColumnNames.isEmpty) { - calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri) + calculateLocationSize(sessionState, catalogTable.identifier, + catalogTable.storage.locationUri) } else { // Calculate table size as a sum of the visible partitions. See SPARK-21079 val partitions = sessionState.catalog.listPartitions(catalogTable.identifier) - partitions.map { p => -calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri) - }.sum + val paths = partitions.map(x => new Path(x.storage.locationUri.get.getPath)) + val pathFilter = new PathFilter { +override def accept(path: Path): Boolean = { + !path.getName.startsWith(stagingDir) +} + } + val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles(paths, +sessionState.newHadoopConf(), pathFilter, spark).flatMap(x => x._2) --- End diff -- The tests are passing but this line is incorrect. @gatorsmile @maropu, PathFilter is not serializable, How do we pass PathFilter in a serializable manner? I checked the code and one other way is to use `FileInputFormat.getInputPathFilter`/`FileInputFormat.setInputPathFilter` but I couldn't get it to work. Also, is it okay if we filter the `Seq[(Path, Seq[FileStatus])]` returned by `bulkListLeafFiles` and remove `stagingDir` files? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r200207191 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala --- @@ -47,22 +47,34 @@ object CommandUtils extends Logging { } } - def calculateTotalSize(sessionState: SessionState, catalogTable: CatalogTable): BigInt = { +def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = { + +val sessionState = spark.sessionState +val serializableConfiguration = new SerializableConfiguration(sessionState.newHadoopConf()) +val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging") + if (catalogTable.partitionColumnNames.isEmpty) { - calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri) + calculateLocationSize(serializableConfiguration, catalogTable.identifier, + catalogTable.storage.locationUri, stagingDir) } else { // Calculate table size as a sum of the visible partitions. See SPARK-21079 val partitions = sessionState.catalog.listPartitions(catalogTable.identifier) - partitions.map { p => -calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri) - }.sum + val numParallelism = Math.min(partitions.size, +Math.min(spark.sparkContext.defaultParallelism, 1)) + spark.sparkContext.parallelize(partitions, numParallelism).mapPartitions { --- End diff -- The direction is right, but the implementation needs to be improved by using `InMemoryFileIndex.bulkListLeafFiles` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org