[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-08-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21608


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-08-07 Thread Achuth17
Github user Achuth17 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r208423190
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
 ---
@@ -49,4 +51,11 @@ object DataSourceUtils {
   }
 }
   }
+
+  // SPARK-15895: Metadata files (e.g. Parquet summary files) and 
temporary files should not be
+  // counted as data files, so that they shouldn't participate partition 
discovery.
+  private[sql] def isDataPath(path: Path): Boolean = {
+val name = path.getName
+!((name.startsWith("_") && !name.contains("=")) || 
name.startsWith("."))
--- End diff --

Sounds good. I have made the changes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-08-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r208408858
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
 ---
@@ -49,4 +51,11 @@ object DataSourceUtils {
   }
 }
   }
+
+  // SPARK-15895: Metadata files (e.g. Parquet summary files) and 
temporary files should not be
+  // counted as data files, so that they shouldn't participate partition 
discovery.
+  private[sql] def isDataPath(path: Path): Boolean = {
+val name = path.getName
+!((name.startsWith("_") && !name.contains("=")) || 
name.startsWith("."))
--- End diff --

Not sure what is your earlier impl. I would prefer to keeping unchanged the 
original code in `PartitioningAwareFileIndex.scala`. Just add a utility 
function `isDataPath ` in CommandUtils.scala. Does this sound good to you?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-08-07 Thread Achuth17
Github user Achuth17 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r208295767
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
 ---
@@ -49,4 +51,11 @@ object DataSourceUtils {
   }
 }
   }
+
+  // SPARK-15895: Metadata files (e.g. Parquet summary files) and 
temporary files should not be
+  // counted as data files, so that they shouldn't participate partition 
discovery.
+  private[sql] def isDataPath(path: Path): Boolean = {
+val name = path.getName
+!((name.startsWith("_") && !name.contains("=")) || 
name.startsWith("."))
--- End diff --

Should I use the earlier implementation with a simple if condition?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-08-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r208269963
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
 ---
@@ -49,4 +51,11 @@ object DataSourceUtils {
   }
 }
   }
+
+  // SPARK-15895: Metadata files (e.g. Parquet summary files) and 
temporary files should not be
+  // counted as data files, so that they shouldn't participate partition 
discovery.
+  private[sql] def isDataPath(path: Path): Boolean = {
+val name = path.getName
+!((name.startsWith("_") && !name.contains("=")) || 
name.startsWith("."))
--- End diff --

Let us do not use the same one with `PartitioningAwareFileIndex.scala`. In 
this case, the data file is not related to the following condition 
`name.contains("=")`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-08-01 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r207091637
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1449,6 +1449,15 @@ object SQLConf {
 .intConf
 .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
 .createWithDefault(Deflater.DEFAULT_COMPRESSION)
+
+  val PARALLEL_FILE_LISTING_IN_COMMMANDS =
+buildConf("spark.sql.parallelFileListingInCommands.enabled")
--- End diff --

It's ok to follow xiao's decision.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-08-01 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r207090891
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1872,6 +1872,8 @@ working with timestamps in `pandas_udf`s to get the 
best performance, see
   - In version 2.3 and earlier, Spark converts Parquet Hive tables by 
default but ignores table properties like `TBLPROPERTIES (parquet.compression 
'NONE')`. This happens for ORC Hive table properties like `TBLPROPERTIES 
(orc.compress 'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`, 
too. Since Spark 2.4, Spark respects Parquet/ORC specific table properties 
while converting Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id 
int) STORED AS PARQUET TBLPROPERTIES (parquet.compression 'NONE')` would 
generate Snappy parquet files during insertion in Spark 2.3, and in Spark 2.4, 
the result would be uncompressed parquet files.
   - Since Spark 2.0, Spark converts Parquet Hive tables by default for 
better performance. Since Spark 2.4, Spark converts ORC Hive tables by default, 
too. It means Spark uses its own ORC support by default instead of Hive SerDe. 
As an example, `CREATE TABLE t(id int) STORED AS ORC` would be handled with 
Hive SerDe in Spark 2.3, and in Spark 2.4, it would be converted into Spark's 
ORC data source table and ORC vectorization would be applied. To set `false` to 
`spark.sql.hive.convertMetastoreOrc` restores the previous behavior.
   - In version 2.3 and earlier, CSV rows are considered as malformed if at 
least one column value in the row is malformed. CSV parser dropped such rows in 
the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 
2.4, CSV row is considered as malformed only when it contains malformed column 
values requested from CSV datasource, other values can be ignored. As an 
example, CSV file contains the "id,name" header and one row "1234". In Spark 
2.4, selection of the id column consists of a row with one column value 1234 
but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore 
the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to 
`false`.
+  - Since Spark 2.4, File listing for compute statistics is done in 
parallel by default. This can be disabled by setting 
`spark.sql.execution.computeStatsListFilesInParallel` to `False`.
+  - Since Spark 2.4, Metadata files (e.g. Parquet summary files) and 
temporary files are not counted as data files when calculating table size 
during Statistics computation.
--- End diff --

ah, ok.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-08-01 Thread Achuth17
Github user Achuth17 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r207090805
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1872,6 +1872,8 @@ working with timestamps in `pandas_udf`s to get the 
best performance, see
   - In version 2.3 and earlier, Spark converts Parquet Hive tables by 
default but ignores table properties like `TBLPROPERTIES (parquet.compression 
'NONE')`. This happens for ORC Hive table properties like `TBLPROPERTIES 
(orc.compress 'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`, 
too. Since Spark 2.4, Spark respects Parquet/ORC specific table properties 
while converting Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id 
int) STORED AS PARQUET TBLPROPERTIES (parquet.compression 'NONE')` would 
generate Snappy parquet files during insertion in Spark 2.3, and in Spark 2.4, 
the result would be uncompressed parquet files.
   - Since Spark 2.0, Spark converts Parquet Hive tables by default for 
better performance. Since Spark 2.4, Spark converts ORC Hive tables by default, 
too. It means Spark uses its own ORC support by default instead of Hive SerDe. 
As an example, `CREATE TABLE t(id int) STORED AS ORC` would be handled with 
Hive SerDe in Spark 2.3, and in Spark 2.4, it would be converted into Spark's 
ORC data source table and ORC vectorization would be applied. To set `false` to 
`spark.sql.hive.convertMetastoreOrc` restores the previous behavior.
   - In version 2.3 and earlier, CSV rows are considered as malformed if at 
least one column value in the row is malformed. CSV parser dropped such rows in 
the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 
2.4, CSV row is considered as malformed only when it contains malformed column 
values requested from CSV datasource, other values can be ignored. As an 
example, CSV file contains the "id,name" header and one row "1234". In Spark 
2.4, selection of the id column consists of a row with one column value 1234 
but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore 
the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to 
`false`.
+  - Since Spark 2.4, File listing for compute statistics is done in 
parallel by default. This can be disabled by setting 
`spark.sql.execution.computeStatsListFilesInParallel` to `False`.
--- End diff --

Same as below. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-08-01 Thread Achuth17
Github user Achuth17 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r207090543
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1449,6 +1449,15 @@ object SQLConf {
 .intConf
 .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
 .createWithDefault(Deflater.DEFAULT_COMPRESSION)
+
+  val PARALLEL_FILE_LISTING_IN_COMMMANDS =
+buildConf("spark.sql.parallelFileListingInCommands.enabled")
--- End diff --

I can make this change.. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-08-01 Thread Achuth17
Github user Achuth17 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r207090467
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1872,6 +1872,8 @@ working with timestamps in `pandas_udf`s to get the 
best performance, see
   - In version 2.3 and earlier, Spark converts Parquet Hive tables by 
default but ignores table properties like `TBLPROPERTIES (parquet.compression 
'NONE')`. This happens for ORC Hive table properties like `TBLPROPERTIES 
(orc.compress 'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`, 
too. Since Spark 2.4, Spark respects Parquet/ORC specific table properties 
while converting Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id 
int) STORED AS PARQUET TBLPROPERTIES (parquet.compression 'NONE')` would 
generate Snappy parquet files during insertion in Spark 2.3, and in Spark 2.4, 
the result would be uncompressed parquet files.
   - Since Spark 2.0, Spark converts Parquet Hive tables by default for 
better performance. Since Spark 2.4, Spark converts ORC Hive tables by default, 
too. It means Spark uses its own ORC support by default instead of Hive SerDe. 
As an example, `CREATE TABLE t(id int) STORED AS ORC` would be handled with 
Hive SerDe in Spark 2.3, and in Spark 2.4, it would be converted into Spark's 
ORC data source table and ORC vectorization would be applied. To set `false` to 
`spark.sql.hive.convertMetastoreOrc` restores the previous behavior.
   - In version 2.3 and earlier, CSV rows are considered as malformed if at 
least one column value in the row is malformed. CSV parser dropped such rows in 
the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 
2.4, CSV row is considered as malformed only when it contains malformed column 
values requested from CSV datasource, other values can be ignored. As an 
example, CSV file contains the "id,name" header and one row "1234". In Spark 
2.4, selection of the id column consists of a row with one column value 1234 
but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore 
the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to 
`false`.
+  - Since Spark 2.4, File listing for compute statistics is done in 
parallel by default. This can be disabled by setting 
`spark.sql.execution.computeStatsListFilesInParallel` to `False`.
+  - Since Spark 2.4, Metadata files (e.g. Parquet summary files) and 
temporary files are not counted as data files when calculating table size 
during Statistics computation.
--- End diff --

@maropu The calculateTotalSize flow is invoked in other places too. For eg, 
it is used in updateTableStats method which is called from a few places.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-08-01 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r207074028
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1449,6 +1449,15 @@ object SQLConf {
 .intConf
 .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
 .createWithDefault(Deflater.DEFAULT_COMPRESSION)
+
+  val PARALLEL_FILE_LISTING_IN_COMMMANDS =
+buildConf("spark.sql.parallelFileListingInCommands.enabled")
--- End diff --

Currently, in the master, this listing happens only in the ANALYZE SQL 
command, so this name fit the case now. But, in a future, if we probably add a 
new interface for the listing (e.g., `SparkSession.analyzeTable`, 
`SparkSession.analyzeColumn`, ...), this name a little confuses users? So, how 
about `spark.sql.parallelFileListingInStatsComputation.enabledl`? @gatorsmile 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-08-01 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r207071540
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1872,6 +1872,8 @@ working with timestamps in `pandas_udf`s to get the 
best performance, see
   - In version 2.3 and earlier, Spark converts Parquet Hive tables by 
default but ignores table properties like `TBLPROPERTIES (parquet.compression 
'NONE')`. This happens for ORC Hive table properties like `TBLPROPERTIES 
(orc.compress 'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`, 
too. Since Spark 2.4, Spark respects Parquet/ORC specific table properties 
while converting Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id 
int) STORED AS PARQUET TBLPROPERTIES (parquet.compression 'NONE')` would 
generate Snappy parquet files during insertion in Spark 2.3, and in Spark 2.4, 
the result would be uncompressed parquet files.
   - Since Spark 2.0, Spark converts Parquet Hive tables by default for 
better performance. Since Spark 2.4, Spark converts ORC Hive tables by default, 
too. It means Spark uses its own ORC support by default instead of Hive SerDe. 
As an example, `CREATE TABLE t(id int) STORED AS ORC` would be handled with 
Hive SerDe in Spark 2.3, and in Spark 2.4, it would be converted into Spark's 
ORC data source table and ORC vectorization would be applied. To set `false` to 
`spark.sql.hive.convertMetastoreOrc` restores the previous behavior.
   - In version 2.3 and earlier, CSV rows are considered as malformed if at 
least one column value in the row is malformed. CSV parser dropped such rows in 
the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 
2.4, CSV row is considered as malformed only when it contains malformed column 
values requested from CSV datasource, other values can be ignored. As an 
example, CSV file contains the "id,name" header and one row "1234". In Spark 
2.4, selection of the id column consists of a row with one column value 1234 
but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore 
the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to 
`false`.
+  - Since Spark 2.4, File listing for compute statistics is done in 
parallel by default. This can be disabled by setting 
`spark.sql.execution.computeStatsListFilesInParallel` to `False`.
+  - Since Spark 2.4, Metadata files (e.g. Parquet summary files) and 
temporary files are not counted as data files when calculating table size 
during Statistics computation.
--- End diff --

nit: `... during Statistics computation in the ANALYZE command.`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-08-01 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r207071484
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1872,6 +1872,8 @@ working with timestamps in `pandas_udf`s to get the 
best performance, see
   - In version 2.3 and earlier, Spark converts Parquet Hive tables by 
default but ignores table properties like `TBLPROPERTIES (parquet.compression 
'NONE')`. This happens for ORC Hive table properties like `TBLPROPERTIES 
(orc.compress 'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`, 
too. Since Spark 2.4, Spark respects Parquet/ORC specific table properties 
while converting Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id 
int) STORED AS PARQUET TBLPROPERTIES (parquet.compression 'NONE')` would 
generate Snappy parquet files during insertion in Spark 2.3, and in Spark 2.4, 
the result would be uncompressed parquet files.
   - Since Spark 2.0, Spark converts Parquet Hive tables by default for 
better performance. Since Spark 2.4, Spark converts ORC Hive tables by default, 
too. It means Spark uses its own ORC support by default instead of Hive SerDe. 
As an example, `CREATE TABLE t(id int) STORED AS ORC` would be handled with 
Hive SerDe in Spark 2.3, and in Spark 2.4, it would be converted into Spark's 
ORC data source table and ORC vectorization would be applied. To set `false` to 
`spark.sql.hive.convertMetastoreOrc` restores the previous behavior.
   - In version 2.3 and earlier, CSV rows are considered as malformed if at 
least one column value in the row is malformed. CSV parser dropped such rows in 
the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 
2.4, CSV row is considered as malformed only when it contains malformed column 
values requested from CSV datasource, other values can be ignored. As an 
example, CSV file contains the "id,name" header and one row "1234". In Spark 
2.4, selection of the id column consists of a row with one column value 1234 
but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore 
the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to 
`false`.
+  - Since Spark 2.4, File listing for compute statistics is done in 
parallel by default. This can be disabled by setting 
`spark.sql.execution.computeStatsListFilesInParallel` to `False`.
--- End diff --

nit: `File listing for compute statistics in the ANALYZE command...`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-08-01 Thread Achuth17
Github user Achuth17 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r207035320
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 ---
@@ -78,7 +93,8 @@ object CommandUtils extends Logging {
   val size = if (fileStatus.isDirectory) {
 fs.listStatus(path)
   .map { status =>
-if (!status.getPath.getName.startsWith(stagingDir)) {
+if (!status.getPath.getName.startsWith(stagingDir) &&
+  DataSourceUtils.isDataPath(path)) {
--- End diff --

Added a line to migration doc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-08-01 Thread Achuth17
Github user Achuth17 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r207035227
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1449,6 +1449,13 @@ object SQLConf {
 .intConf
 .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
 .createWithDefault(Deflater.DEFAULT_COMPRESSION)
+
+  val COMPUTE_STATS_LIST_FILES_IN_PARALLEL =
+buildConf("spark.sql.execution.computeStatsListFilesInParallel")
+  .internal()
+  .doc("If True, File listing for compute statistics is done in 
parallel.")
--- End diff --

Thanks! I have made this change,


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-08-01 Thread Achuth17
Github user Achuth17 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r207035140
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1449,6 +1449,13 @@ object SQLConf {
 .intConf
 .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
 .createWithDefault(Deflater.DEFAULT_COMPRESSION)
+
+  val COMPUTE_STATS_LIST_FILES_IN_PARALLEL =
+buildConf("spark.sql.execution.computeStatsListFilesInParallel")
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-08-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r207020824
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 ---
@@ -78,7 +93,8 @@ object CommandUtils extends Logging {
   val size = if (fileStatus.isDirectory) {
 fs.listStatus(path)
   .map { status =>
-if (!status.getPath.getName.startsWith(stagingDir)) {
+if (!status.getPath.getName.startsWith(stagingDir) &&
+  DataSourceUtils.isDataPath(path)) {
--- End diff --

This is also a behavior change. Could you document it too?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-08-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r207017248
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1449,6 +1449,13 @@ object SQLConf {
 .intConf
 .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
 .createWithDefault(Deflater.DEFAULT_COMPRESSION)
+
+  val COMPUTE_STATS_LIST_FILES_IN_PARALLEL =
+buildConf("spark.sql.execution.computeStatsListFilesInParallel")
+  .internal()
+  .doc("If True, File listing for compute statistics is done in 
parallel.")
--- End diff --

When true, SQL commands use parallel file listing, as opposed to single 
thread listing. This usually speeds up commands that need to list many 
directories.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-08-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r207017122
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1449,6 +1449,13 @@ object SQLConf {
 .intConf
 .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
 .createWithDefault(Deflater.DEFAULT_COMPRESSION)
+
+  val COMPUTE_STATS_LIST_FILES_IN_PARALLEL =
+buildConf("spark.sql.execution.computeStatsListFilesInParallel")
--- End diff --

How about `spark.sql.parallelFileListingInCommands.enabled`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-27 Thread Achuth17
Github user Achuth17 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r205916878
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 ---
@@ -47,15 +48,23 @@ object CommandUtils extends Logging {
 }
   }
 
-  def calculateTotalSize(sessionState: SessionState, catalogTable: 
CatalogTable): BigInt = {
+  def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): 
BigInt = {
+val sessionState = spark.sessionState
 if (catalogTable.partitionColumnNames.isEmpty) {
   calculateLocationSize(sessionState, catalogTable.identifier, 
catalogTable.storage.locationUri)
 } else {
   // Calculate table size as a sum of the visible partitions. See 
SPARK-21079
   val partitions = 
sessionState.catalog.listPartitions(catalogTable.identifier)
-  partitions.map { p =>
-calculateLocationSize(sessionState, catalogTable.identifier, 
p.storage.locationUri)
-  }.sum
+  val paths = partitions.map(x => new Path(x.storage.locationUri.get))
+  val stagingDir = 
sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
+  val pathFilter = new PathFilter with Serializable {
+override def accept(path: Path): Boolean = {
+  DataSourceUtils.isDataPath(path) && 
!path.getName.startsWith(stagingDir)
--- End diff --

This is a private method in partitioningAwareFileIndex, Should I add this 
change to the migration guide?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-27 Thread Achuth17
Github user Achuth17 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r205916762
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 ---
@@ -47,15 +48,23 @@ object CommandUtils extends Logging {
 }
   }
 
-  def calculateTotalSize(sessionState: SessionState, catalogTable: 
CatalogTable): BigInt = {
+  def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): 
BigInt = {
+val sessionState = spark.sessionState
 if (catalogTable.partitionColumnNames.isEmpty) {
   calculateLocationSize(sessionState, catalogTable.identifier, 
catalogTable.storage.locationUri)
--- End diff --

I have made the changes for this in calculateLocationSize.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-26 Thread Achuth17
Github user Achuth17 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r205546260
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 ---
@@ -47,15 +48,23 @@ object CommandUtils extends Logging {
 }
   }
 
-  def calculateTotalSize(sessionState: SessionState, catalogTable: 
CatalogTable): BigInt = {
+  def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): 
BigInt = {
+val sessionState = spark.sessionState
 if (catalogTable.partitionColumnNames.isEmpty) {
   calculateLocationSize(sessionState, catalogTable.identifier, 
catalogTable.storage.locationUri)
--- End diff --

Got it, maybe we can add the filter inside calculateLocationSize. Let me 
know if that's okay, I can make the changes. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-26 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r205545427
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 ---
@@ -47,15 +48,23 @@ object CommandUtils extends Logging {
 }
   }
 
-  def calculateTotalSize(sessionState: SessionState, catalogTable: 
CatalogTable): BigInt = {
+  def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): 
BigInt = {
+val sessionState = spark.sessionState
 if (catalogTable.partitionColumnNames.isEmpty) {
   calculateLocationSize(sessionState, catalogTable.identifier, 
catalogTable.storage.locationUri)
--- End diff --

Let us make them consistent. only count the data files.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-26 Thread Achuth17
Github user Achuth17 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r205545098
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 ---
@@ -47,15 +48,23 @@ object CommandUtils extends Logging {
 }
   }
 
-  def calculateTotalSize(sessionState: SessionState, catalogTable: 
CatalogTable): BigInt = {
+  def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): 
BigInt = {
+val sessionState = spark.sessionState
 if (catalogTable.partitionColumnNames.isEmpty) {
   calculateLocationSize(sessionState, catalogTable.identifier, 
catalogTable.storage.locationUri)
--- End diff --

If not, we have to make changes in all those places too. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-26 Thread Achuth17
Github user Achuth17 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r205544840
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 ---
@@ -47,15 +48,23 @@ object CommandUtils extends Logging {
 }
   }
 
-  def calculateTotalSize(sessionState: SessionState, catalogTable: 
CatalogTable): BigInt = {
+  def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): 
BigInt = {
+val sessionState = spark.sessionState
 if (catalogTable.partitionColumnNames.isEmpty) {
   calculateLocationSize(sessionState, catalogTable.identifier, 
catalogTable.storage.locationUri)
--- End diff --

This filtering of temp files is not done in other places where 
calculateLocationSize is called directly (eg AlterTableAddPartition). Should we 
skip checking for the temp files since they don't make a lot of difference to 
the total size? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-26 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r205542099
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 ---
@@ -47,15 +48,23 @@ object CommandUtils extends Logging {
 }
   }
 
-  def calculateTotalSize(sessionState: SessionState, catalogTable: 
CatalogTable): BigInt = {
+  def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): 
BigInt = {
+val sessionState = spark.sessionState
 if (catalogTable.partitionColumnNames.isEmpty) {
   calculateLocationSize(sessionState, catalogTable.identifier, 
catalogTable.storage.locationUri)
--- End diff --

It sounds like we are not following the same file filtering for 
non-partition tables. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-26 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r205539125
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 ---
@@ -47,15 +48,23 @@ object CommandUtils extends Logging {
 }
   }
 
-  def calculateTotalSize(sessionState: SessionState, catalogTable: 
CatalogTable): BigInt = {
+  def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): 
BigInt = {
+val sessionState = spark.sessionState
 if (catalogTable.partitionColumnNames.isEmpty) {
   calculateLocationSize(sessionState, catalogTable.identifier, 
catalogTable.storage.locationUri)
 } else {
   // Calculate table size as a sum of the visible partitions. See 
SPARK-21079
   val partitions = 
sessionState.catalog.listPartitions(catalogTable.identifier)
-  partitions.map { p =>
-calculateLocationSize(sessionState, catalogTable.identifier, 
p.storage.locationUri)
-  }.sum
+  val paths = partitions.map(x => new Path(x.storage.locationUri.get))
+  val stagingDir = 
sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
+  val pathFilter = new PathFilter with Serializable {
+override def accept(path: Path): Boolean = {
+  DataSourceUtils.isDataPath(path) && 
!path.getName.startsWith(stagingDir)
--- End diff --

This is a behavior change. Could you document the changes in the migration 
guide?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-24 Thread Achuth17
Github user Achuth17 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r204823567
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 ---
@@ -47,15 +48,26 @@ object CommandUtils extends Logging {
 }
   }
 
-  def calculateTotalSize(sessionState: SessionState, catalogTable: 
CatalogTable): BigInt = {
+  def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): 
BigInt = {
+val sessionState = spark.sessionState
 if (catalogTable.partitionColumnNames.isEmpty) {
   calculateLocationSize(sessionState, catalogTable.identifier, 
catalogTable.storage.locationUri)
 } else {
   // Calculate table size as a sum of the visible partitions. See 
SPARK-21079
   val partitions = 
sessionState.catalog.listPartitions(catalogTable.identifier)
-  partitions.map { p =>
-calculateLocationSize(sessionState, catalogTable.identifier, 
p.storage.locationUri)
-  }.sum
+  val paths = partitions.map(x => new Path(x.storage.locationUri.get))
+  val stagingDir = 
sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
+  val pathFilter = new PathFilter with Serializable {
+override def accept(path: Path): Boolean = {
+  val fileName = path.getName
+  (// Ignore metadata files starting with "_"
+DataSourceUtils.isDataPath(path) && 
!fileName.startsWith(stagingDir))
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-24 Thread Achuth17
Github user Achuth17 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r204823533
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 ---
@@ -47,15 +48,26 @@ object CommandUtils extends Logging {
 }
   }
 
-  def calculateTotalSize(sessionState: SessionState, catalogTable: 
CatalogTable): BigInt = {
+  def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): 
BigInt = {
+val sessionState = spark.sessionState
 if (catalogTable.partitionColumnNames.isEmpty) {
   calculateLocationSize(sessionState, catalogTable.identifier, 
catalogTable.storage.locationUri)
 } else {
   // Calculate table size as a sum of the visible partitions. See 
SPARK-21079
   val partitions = 
sessionState.catalog.listPartitions(catalogTable.identifier)
-  partitions.map { p =>
-calculateLocationSize(sessionState, catalogTable.identifier, 
p.storage.locationUri)
-  }.sum
+  val paths = partitions.map(x => new Path(x.storage.locationUri.get))
+  val stagingDir = 
sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
+  val pathFilter = new PathFilter with Serializable {
+override def accept(path: Path): Boolean = {
+  val fileName = path.getName
+  (// Ignore metadata files starting with "_"
+DataSourceUtils.isDataPath(path) && 
!fileName.startsWith(stagingDir))
+}
+  }
+  val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles(paths,
+sessionState.newHadoopConf(), pathFilter,
+spark).flatMap(_._2)
+  fileStatusSeq.map(_.getLen).sum
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-24 Thread Achuth17
Github user Achuth17 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r204823422
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -148,6 +148,19 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 }
   }
 
+  test("verify table size calculation is accurate") {
--- End diff --

Done. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-24 Thread Achuth17
Github user Achuth17 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r204823485
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
 ---
@@ -59,14 +59,15 @@ abstract class PartitioningAwareFileIndex(
   override def listFiles(
   partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): 
Seq[PartitionDirectory] = {
 val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) 
{
-  PartitionDirectory(InternalRow.empty, allFiles().filter(f => 
isDataPath(f.getPath))) :: Nil
+  PartitionDirectory(InternalRow.empty,
+allFiles().filter(f => DataSourceUtils.isDataPath(f.getPath))) :: 
Nil
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-24 Thread Achuth17
Github user Achuth17 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r204823137
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -148,6 +149,25 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 }
   }
 
+  test("verify table size calculation is accurate") {
+val checkSizeTable = "checkSizeTable"
--- End diff --

Done.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-24 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r204656360
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -148,6 +149,25 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 }
   }
 
+  test("verify table size calculation is accurate") {
+val checkSizeTable = "checkSizeTable"
--- End diff --

super nit: can you move this variable inside `withSQLConf`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-24 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r204655895
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -148,6 +148,19 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 }
   }
 
+  test("verify table size calculation is accurate") {
--- End diff --

We should make the title more obvious and how about `SPARK-24626 
parallelize location size calculation in Analyze Table command`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-24 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r204654664
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
 ---
@@ -59,14 +59,15 @@ abstract class PartitioningAwareFileIndex(
   override def listFiles(
   partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): 
Seq[PartitionDirectory] = {
 val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) 
{
-  PartitionDirectory(InternalRow.empty, allFiles().filter(f => 
isDataPath(f.getPath))) :: Nil
+  PartitionDirectory(InternalRow.empty,
+allFiles().filter(f => DataSourceUtils.isDataPath(f.getPath))) :: 
Nil
--- End diff --

super nit: 
```
  val files = allFiles().filter(f => 
DataSourceUtils.isDataPath(f.getPath))
  PartitionDirectory(InternalRow.empty, files) :: Nil
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-24 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r204654083
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 ---
@@ -47,15 +48,26 @@ object CommandUtils extends Logging {
 }
   }
 
-  def calculateTotalSize(sessionState: SessionState, catalogTable: 
CatalogTable): BigInt = {
+  def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): 
BigInt = {
+val sessionState = spark.sessionState
 if (catalogTable.partitionColumnNames.isEmpty) {
   calculateLocationSize(sessionState, catalogTable.identifier, 
catalogTable.storage.locationUri)
 } else {
   // Calculate table size as a sum of the visible partitions. See 
SPARK-21079
   val partitions = 
sessionState.catalog.listPartitions(catalogTable.identifier)
-  partitions.map { p =>
-calculateLocationSize(sessionState, catalogTable.identifier, 
p.storage.locationUri)
-  }.sum
+  val paths = partitions.map(x => new Path(x.storage.locationUri.get))
+  val stagingDir = 
sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
+  val pathFilter = new PathFilter with Serializable {
+override def accept(path: Path): Boolean = {
+  val fileName = path.getName
+  (// Ignore metadata files starting with "_"
+DataSourceUtils.isDataPath(path) && 
!fileName.startsWith(stagingDir))
+}
+  }
+  val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles(paths,
+sessionState.newHadoopConf(), pathFilter,
+spark).flatMap(_._2)
+  fileStatusSeq.map(_.getLen).sum
--- End diff --

How about this?
```
  val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles(
paths, sessionState.newHadoopConf(), 
SerializablePathFilter(stagingDir), spark)
  fileStatusSeq.flatMap(_._2.map(_.getLen)).sum
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-24 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r204652437
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 ---
@@ -47,15 +48,26 @@ object CommandUtils extends Logging {
 }
   }
 
-  def calculateTotalSize(sessionState: SessionState, catalogTable: 
CatalogTable): BigInt = {
+  def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): 
BigInt = {
+val sessionState = spark.sessionState
 if (catalogTable.partitionColumnNames.isEmpty) {
   calculateLocationSize(sessionState, catalogTable.identifier, 
catalogTable.storage.locationUri)
 } else {
   // Calculate table size as a sum of the visible partitions. See 
SPARK-21079
   val partitions = 
sessionState.catalog.listPartitions(catalogTable.identifier)
-  partitions.map { p =>
-calculateLocationSize(sessionState, catalogTable.identifier, 
p.storage.locationUri)
-  }.sum
+  val paths = partitions.map(x => new Path(x.storage.locationUri.get))
+  val stagingDir = 
sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
+  val pathFilter = new PathFilter with Serializable {
+override def accept(path: Path): Boolean = {
+  val fileName = path.getName
+  (// Ignore metadata files starting with "_"
+DataSourceUtils.isDataPath(path) && 
!fileName.startsWith(stagingDir))
--- End diff --

How about this?
```
override def accept(path: Path): Boolean = {
  DataSourceUtils.isDataPath(path) && !path.getName.startsWith(stagingDir))
}
```



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-23 Thread Achuth17
Github user Achuth17 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r204618663
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -148,6 +148,19 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 }
   }
 
+  test("verify table size calculation is accurate") {
--- End diff --

@maropu, Does this test look okay?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-23 Thread Achuth17
Github user Achuth17 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r204618589
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
 ---
@@ -55,4 +57,11 @@ private[sql] object DataSourceV2Utils extends Logging {
 
 case _ => Map.empty
   }
+
+  // SPARK-15895: Metadata files (e.g. Parquet summary files) and 
temporary files should not be
--- End diff --

Moved it, I couldn't see that file before pulling the upstream master. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-23 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r204609271
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
 ---
@@ -55,4 +57,11 @@ private[sql] object DataSourceV2Utils extends Logging {
 
 case _ => Map.empty
   }
+
+  // SPARK-15895: Metadata files (e.g. Parquet summary files) and 
temporary files should not be
--- End diff --

Why do you use `DataSourceV2Utils` instead of `DataSourceUtils`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-20 Thread Achuth17
Github user Achuth17 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r204200662
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 ---
@@ -47,15 +48,27 @@ object CommandUtils extends Logging {
 }
   }
 
-  def calculateTotalSize(sessionState: SessionState, catalogTable: 
CatalogTable): BigInt = {
+  def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): 
BigInt = {
+val sessionState = spark.sessionState
 if (catalogTable.partitionColumnNames.isEmpty) {
   calculateLocationSize(sessionState, catalogTable.identifier, 
catalogTable.storage.locationUri)
 } else {
   // Calculate table size as a sum of the visible partitions. See 
SPARK-21079
   val partitions = 
sessionState.catalog.listPartitions(catalogTable.identifier)
-  partitions.map { p =>
-calculateLocationSize(sessionState, catalogTable.identifier, 
p.storage.locationUri)
-  }.sum
+  val paths = partitions.map(x => new Path(x.storage.locationUri.get))
+  val stagingDir = 
sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
+  val pathFilter = new PathFilter with Serializable {
+override def accept(path: Path): Boolean = {
+  val fileName = path.getName
+  (!fileName.startsWith(stagingDir) &&
+// Ignore metadata files starting with "_"
+!fileName.startsWith("_"))
--- End diff --

Done. 
Also, we are not doing this check when `calculateLocationSize` is called 
directly. I will file a different PR for this as this is not related to 
AnalyzeTableCommand. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-20 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r204196291
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 ---
@@ -47,15 +48,27 @@ object CommandUtils extends Logging {
 }
   }
 
-  def calculateTotalSize(sessionState: SessionState, catalogTable: 
CatalogTable): BigInt = {
+  def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): 
BigInt = {
+val sessionState = spark.sessionState
 if (catalogTable.partitionColumnNames.isEmpty) {
   calculateLocationSize(sessionState, catalogTable.identifier, 
catalogTable.storage.locationUri)
 } else {
   // Calculate table size as a sum of the visible partitions. See 
SPARK-21079
   val partitions = 
sessionState.catalog.listPartitions(catalogTable.identifier)
-  partitions.map { p =>
-calculateLocationSize(sessionState, catalogTable.identifier, 
p.storage.locationUri)
-  }.sum
+  val paths = partitions.map(x => new Path(x.storage.locationUri.get))
+  val stagingDir = 
sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
+  val pathFilter = new PathFilter with Serializable {
+override def accept(path: Path): Boolean = {
+  val fileName = path.getName
+  (!fileName.startsWith(stagingDir) &&
+// Ignore metadata files starting with "_"
+!fileName.startsWith("_"))
--- End diff --

How about `DataSourceUtils`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-20 Thread Achuth17
Github user Achuth17 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r204121451
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 ---
@@ -47,15 +48,27 @@ object CommandUtils extends Logging {
 }
   }
 
-  def calculateTotalSize(sessionState: SessionState, catalogTable: 
CatalogTable): BigInt = {
+  def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): 
BigInt = {
+val sessionState = spark.sessionState
 if (catalogTable.partitionColumnNames.isEmpty) {
   calculateLocationSize(sessionState, catalogTable.identifier, 
catalogTable.storage.locationUri)
 } else {
   // Calculate table size as a sum of the visible partitions. See 
SPARK-21079
   val partitions = 
sessionState.catalog.listPartitions(catalogTable.identifier)
-  partitions.map { p =>
-calculateLocationSize(sessionState, catalogTable.identifier, 
p.storage.locationUri)
-  }.sum
+  val paths = partitions.map(x => new Path(x.storage.locationUri.get))
+  val stagingDir = 
sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
+  val pathFilter = new PathFilter with Serializable {
+override def accept(path: Path): Boolean = {
+  val fileName = path.getName
+  (!fileName.startsWith(stagingDir) &&
+// Ignore metadata files starting with "_"
+!fileName.startsWith("_"))
--- End diff --

It is a generic method and only used within `PartitioningAwareFileIndex`, 
so we can move it to `CommandUtils`. Does that sound okay?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-20 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r203953577
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 ---
@@ -47,15 +48,27 @@ object CommandUtils extends Logging {
 }
   }
 
-  def calculateTotalSize(sessionState: SessionState, catalogTable: 
CatalogTable): BigInt = {
+  def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): 
BigInt = {
+val sessionState = spark.sessionState
 if (catalogTable.partitionColumnNames.isEmpty) {
   calculateLocationSize(sessionState, catalogTable.identifier, 
catalogTable.storage.locationUri)
 } else {
   // Calculate table size as a sum of the visible partitions. See 
SPARK-21079
   val partitions = 
sessionState.catalog.listPartitions(catalogTable.identifier)
-  partitions.map { p =>
-calculateLocationSize(sessionState, catalogTable.identifier, 
p.storage.locationUri)
-  }.sum
+  val paths = partitions.map(x => new Path(x.storage.locationUri.get))
+  val stagingDir = 
sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
+  val pathFilter = new PathFilter with Serializable {
+override def accept(path: Path): Boolean = {
+  val fileName = path.getName
+  (!fileName.startsWith(stagingDir) &&
+// Ignore metadata files starting with "_"
+!fileName.startsWith("_"))
--- End diff --

How about just moving the function into somewhere that can be accessed from 
here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-18 Thread Achuth17
Github user Achuth17 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r203613041
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -148,6 +148,19 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 }
   }
 
+  test("verify table size calculation is accurate") {
--- End diff --

@maropu, I have fixed the test to verify if the calculation is being done 
in parallel. Can you review the change?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-18 Thread Achuth17
Github user Achuth17 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r203456566
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 ---
@@ -47,15 +48,27 @@ object CommandUtils extends Logging {
 }
   }
 
-  def calculateTotalSize(sessionState: SessionState, catalogTable: 
CatalogTable): BigInt = {
+  def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): 
BigInt = {
+val sessionState = spark.sessionState
 if (catalogTable.partitionColumnNames.isEmpty) {
   calculateLocationSize(sessionState, catalogTable.identifier, 
catalogTable.storage.locationUri)
 } else {
   // Calculate table size as a sum of the visible partitions. See 
SPARK-21079
   val partitions = 
sessionState.catalog.listPartitions(catalogTable.identifier)
-  partitions.map { p =>
-calculateLocationSize(sessionState, catalogTable.identifier, 
p.storage.locationUri)
-  }.sum
+  val paths = partitions.map(x => new Path(x.storage.locationUri.get))
+  val stagingDir = 
sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
+  val pathFilter = new PathFilter with Serializable {
+override def accept(path: Path): Boolean = {
+  val fileName = path.getName
+  (!fileName.startsWith(stagingDir) &&
+// Ignore metadata files starting with "_"
+!fileName.startsWith("_"))
--- End diff --

So we are not creating an instance of `InMemoryFileIndex` class so 
`isDataPath` is not accessible here. I think it would be simpler to add these 
checks here. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-17 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r203265365
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 ---
@@ -47,15 +48,27 @@ object CommandUtils extends Logging {
 }
   }
 
-  def calculateTotalSize(sessionState: SessionState, catalogTable: 
CatalogTable): BigInt = {
+  def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): 
BigInt = {
+val sessionState = spark.sessionState
 if (catalogTable.partitionColumnNames.isEmpty) {
   calculateLocationSize(sessionState, catalogTable.identifier, 
catalogTable.storage.locationUri)
 } else {
   // Calculate table size as a sum of the visible partitions. See 
SPARK-21079
   val partitions = 
sessionState.catalog.listPartitions(catalogTable.identifier)
-  partitions.map { p =>
-calculateLocationSize(sessionState, catalogTable.identifier, 
p.storage.locationUri)
-  }.sum
+  val paths = partitions.map(x => new Path(x.storage.locationUri.get))
+  val stagingDir = 
sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
+  val pathFilter = new PathFilter with Serializable {
+override def accept(path: Path): Boolean = {
+  val fileName = path.getName
+  (!fileName.startsWith(stagingDir) &&
+// Ignore metadata files starting with "_"
+!fileName.startsWith("_"))
--- End diff --

We already have the function to check if it is a data file, or not. Can we 
reuse this? 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala#L232


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-17 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r203264804
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -148,6 +148,19 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 }
   }
 
+  test("verify table size calculation is accurate") {
--- End diff --

I originally meant you'd be better to test if the calculation done in 
parallel like 
https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala#L99


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-11 Thread Achuth17
Github user Achuth17 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r201820832
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
 ---
@@ -162,7 +162,7 @@ object InMemoryFileIndex extends Logging {
*
* @return for each input path, the set of discovered files for the path
*/
-  private def bulkListLeafFiles(
+   def bulkListLeafFiles(
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-11 Thread Achuth17
Github user Achuth17 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r201820630
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 ---
@@ -47,15 +48,29 @@ object CommandUtils extends Logging {
 }
   }
 
-  def calculateTotalSize(sessionState: SessionState, catalogTable: 
CatalogTable): BigInt = {
+  private case class SerializablePathFilter(stagingDir: String)
+extends PathFilter with Serializable {
+override def accept(path: Path): Boolean = {
+  val fileName = path.getName
+  (!fileName.startsWith(stagingDir) &&
+// Ignore metadata files starting with "_"
+!fileName.startsWith("_"))
+}
+  }
+
+  def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): 
BigInt = {
+val sessionState = spark.sessionState
 if (catalogTable.partitionColumnNames.isEmpty) {
   calculateLocationSize(sessionState, catalogTable.identifier, 
catalogTable.storage.locationUri)
 } else {
   // Calculate table size as a sum of the visible partitions. See 
SPARK-21079
   val partitions = 
sessionState.catalog.listPartitions(catalogTable.identifier)
-  partitions.map { p =>
-calculateLocationSize(sessionState, catalogTable.identifier, 
p.storage.locationUri)
-  }.sum
+  val paths = partitions.map(x => new Path(x.storage.locationUri.get))
+  val stagingDir = 
sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
--- End diff --

Done. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-10 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r201545004
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
 ---
@@ -162,7 +162,7 @@ object InMemoryFileIndex extends Logging {
*
* @return for each input path, the set of discovered files for the path
*/
-  private def bulkListLeafFiles(
+   def bulkListLeafFiles(
--- End diff --

Remove the unnecessary indent. Also, `private[sql]`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-10 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r201544879
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 ---
@@ -47,15 +48,29 @@ object CommandUtils extends Logging {
 }
   }
 
-  def calculateTotalSize(sessionState: SessionState, catalogTable: 
CatalogTable): BigInt = {
+  private case class SerializablePathFilter(stagingDir: String)
+extends PathFilter with Serializable {
+override def accept(path: Path): Boolean = {
+  val fileName = path.getName
+  (!fileName.startsWith(stagingDir) &&
+// Ignore metadata files starting with "_"
+!fileName.startsWith("_"))
+}
+  }
+
+  def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): 
BigInt = {
+val sessionState = spark.sessionState
 if (catalogTable.partitionColumnNames.isEmpty) {
   calculateLocationSize(sessionState, catalogTable.identifier, 
catalogTable.storage.locationUri)
 } else {
   // Calculate table size as a sum of the visible partitions. See 
SPARK-21079
   val partitions = 
sessionState.catalog.listPartitions(catalogTable.identifier)
-  partitions.map { p =>
-calculateLocationSize(sessionState, catalogTable.identifier, 
p.storage.locationUri)
-  }.sum
+  val paths = partitions.map(x => new Path(x.storage.locationUri.get))
+  val stagingDir = 
sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
--- End diff --

Since `SerializablePathFilter` is only used here, how about defining it as 
an anonymous class?
```
  val stagingDir = 
sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
  val pathFilter = new PathFilter with Serializable {
override def accept(path: Path): Boolean = ...
  }
  val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles(paths,
sessionState.newHadoopConf(), pathFilter,
spark).flatMap(_._2)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-07 Thread Achuth17
Github user Achuth17 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r200824025
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 ---
@@ -47,15 +48,26 @@ object CommandUtils extends Logging {
 }
   }
 
-  def calculateTotalSize(sessionState: SessionState, catalogTable: 
CatalogTable): BigInt = {
+def calculateTotalSize(spark: SparkSession, catalogTable: 
CatalogTable): BigInt = {
+
+val sessionState = spark.sessionState
+val stagingDir = 
sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
+
 if (catalogTable.partitionColumnNames.isEmpty) {
-  calculateLocationSize(sessionState, catalogTable.identifier, 
catalogTable.storage.locationUri)
+  calculateLocationSize(sessionState, catalogTable.identifier,
+  catalogTable.storage.locationUri)
 } else {
   // Calculate table size as a sum of the visible partitions. See 
SPARK-21079
   val partitions = 
sessionState.catalog.listPartitions(catalogTable.identifier)
-  partitions.map { p =>
-calculateLocationSize(sessionState, catalogTable.identifier, 
p.storage.locationUri)
-  }.sum
+  val paths = partitions.map(x => new 
Path(x.storage.locationUri.get.getPath))
+  val pathFilter = new PathFilter {
+override def accept(path: Path): Boolean = {
+  !path.getName.startsWith(stagingDir)
+}
+  }
+  val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles(paths,
+sessionState.newHadoopConf(), pathFilter, spark).flatMap(x => x._2)
--- End diff --

Thank you, I have made the changes. Can you review this? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-06 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r200704192
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 ---
@@ -47,15 +48,26 @@ object CommandUtils extends Logging {
 }
   }
 
-  def calculateTotalSize(sessionState: SessionState, catalogTable: 
CatalogTable): BigInt = {
+def calculateTotalSize(spark: SparkSession, catalogTable: 
CatalogTable): BigInt = {
+
+val sessionState = spark.sessionState
+val stagingDir = 
sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
+
 if (catalogTable.partitionColumnNames.isEmpty) {
-  calculateLocationSize(sessionState, catalogTable.identifier, 
catalogTable.storage.locationUri)
+  calculateLocationSize(sessionState, catalogTable.identifier,
+  catalogTable.storage.locationUri)
 } else {
   // Calculate table size as a sum of the visible partitions. See 
SPARK-21079
   val partitions = 
sessionState.catalog.listPartitions(catalogTable.identifier)
-  partitions.map { p =>
-calculateLocationSize(sessionState, catalogTable.identifier, 
p.storage.locationUri)
-  }.sum
+  val paths = partitions.map(x => new 
Path(x.storage.locationUri.get.getPath))
+  val pathFilter = new PathFilter {
+override def accept(path: Path): Boolean = {
+  !path.getName.startsWith(stagingDir)
+}
+  }
+  val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles(paths,
+sessionState.newHadoopConf(), pathFilter, spark).flatMap(x => x._2)
--- End diff --

```Scala
class PathFilterIgnoreNonData(stagingDir: String) extends PathFilter with 
Serializable {
  override def accept(path: Path): Boolean = {
val fileName = path.getName
(!fileName.startsWith(stagingDir) &&
  // Ignore metadata files starting with "_" (for example, files 
created by
  // DirectoryAtomicCommitProtocol) when computing the location size
  !fileName.startsWith("_"))
  }
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-06 Thread Achuth17
Github user Achuth17 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r200666542
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 ---
@@ -47,15 +48,26 @@ object CommandUtils extends Logging {
 }
   }
 
-  def calculateTotalSize(sessionState: SessionState, catalogTable: 
CatalogTable): BigInt = {
+def calculateTotalSize(spark: SparkSession, catalogTable: 
CatalogTable): BigInt = {
+
+val sessionState = spark.sessionState
+val stagingDir = 
sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
+
 if (catalogTable.partitionColumnNames.isEmpty) {
-  calculateLocationSize(sessionState, catalogTable.identifier, 
catalogTable.storage.locationUri)
+  calculateLocationSize(sessionState, catalogTable.identifier,
+  catalogTable.storage.locationUri)
 } else {
   // Calculate table size as a sum of the visible partitions. See 
SPARK-21079
   val partitions = 
sessionState.catalog.listPartitions(catalogTable.identifier)
-  partitions.map { p =>
-calculateLocationSize(sessionState, catalogTable.identifier, 
p.storage.locationUri)
-  }.sum
+  val paths = partitions.map(x => new 
Path(x.storage.locationUri.get.getPath))
+  val pathFilter = new PathFilter {
+override def accept(path: Path): Boolean = {
+  !path.getName.startsWith(stagingDir)
+}
+  }
+  val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles(paths,
+sessionState.newHadoopConf(), pathFilter, spark).flatMap(x => x._2)
--- End diff --

The above approach might not work too. In the earlier implementation there 
was a check from recursively listing files from certain directories 
(`stagingDir`) and having a pathFilter might not be the right approach. 

So I wanted to introduce a list of strings called `filterDir` as a new 
parameter to `bulkListLeafFiles` which can be used to check if a particular 
directory can be recursed further. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-05 Thread Achuth17
Github user Achuth17 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r200544127
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 ---
@@ -47,15 +48,26 @@ object CommandUtils extends Logging {
 }
   }
 
-  def calculateTotalSize(sessionState: SessionState, catalogTable: 
CatalogTable): BigInt = {
+def calculateTotalSize(spark: SparkSession, catalogTable: 
CatalogTable): BigInt = {
+
+val sessionState = spark.sessionState
+val stagingDir = 
sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
+
 if (catalogTable.partitionColumnNames.isEmpty) {
-  calculateLocationSize(sessionState, catalogTable.identifier, 
catalogTable.storage.locationUri)
+  calculateLocationSize(sessionState, catalogTable.identifier,
+  catalogTable.storage.locationUri)
 } else {
   // Calculate table size as a sum of the visible partitions. See 
SPARK-21079
   val partitions = 
sessionState.catalog.listPartitions(catalogTable.identifier)
-  partitions.map { p =>
-calculateLocationSize(sessionState, catalogTable.identifier, 
p.storage.locationUri)
-  }.sum
+  val paths = partitions.map(x => new 
Path(x.storage.locationUri.get.getPath))
+  val pathFilter = new PathFilter {
+override def accept(path: Path): Boolean = {
+  !path.getName.startsWith(stagingDir)
+}
+  }
+  val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles(paths,
+sessionState.newHadoopConf(), pathFilter, spark).flatMap(x => x._2)
--- End diff --

The tests are passing but this line is incorrect. 

@gatorsmile @maropu, PathFilter is not serializable, How do we pass 
PathFilter in a serializable manner? I checked the code and one other way is to 
use `FileInputFormat.getInputPathFilter`/`FileInputFormat.setInputPathFilter` 
but I couldn't get it to work. 

Also, is it okay if we filter the `Seq[(Path, Seq[FileStatus])]` returned 
by `bulkListLeafFiles` and remove `stagingDir` files? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-04 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r200207191
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 ---
@@ -47,22 +47,34 @@ object CommandUtils extends Logging {
 }
   }
 
-  def calculateTotalSize(sessionState: SessionState, catalogTable: 
CatalogTable): BigInt = {
+def calculateTotalSize(spark: SparkSession, catalogTable: 
CatalogTable): BigInt = {
+
+val sessionState = spark.sessionState
+val serializableConfiguration = new 
SerializableConfiguration(sessionState.newHadoopConf())
+val stagingDir = 
sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
+
 if (catalogTable.partitionColumnNames.isEmpty) {
-  calculateLocationSize(sessionState, catalogTable.identifier, 
catalogTable.storage.locationUri)
+  calculateLocationSize(serializableConfiguration, 
catalogTable.identifier,
+  catalogTable.storage.locationUri, stagingDir)
 } else {
   // Calculate table size as a sum of the visible partitions. See 
SPARK-21079
   val partitions = 
sessionState.catalog.listPartitions(catalogTable.identifier)
-  partitions.map { p =>
-calculateLocationSize(sessionState, catalogTable.identifier, 
p.storage.locationUri)
-  }.sum
+  val numParallelism = Math.min(partitions.size,
+Math.min(spark.sparkContext.defaultParallelism, 1))
+  spark.sparkContext.parallelize(partitions, 
numParallelism).mapPartitions {
--- End diff --

The direction is right, but the implementation needs to be improved by 
using `InMemoryFileIndex.bulkListLeafFiles`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org