[GitHub] spark issue #22905: [SPARK-25894][SQL] Add a ColumnarFileFormat type which r...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/22905 I think I've made my case for this patch as best I can. It does not appear this PR has unanimous support, but I continue to believe we should merge it to master. So where do we take it from here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22880: [SPARK-25407][SQL] Ensure we pass a compatible pruned sc...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/22880 Hi @dbtsai @HyukjinKwon @gatorsmile @viirya. Can we merge this to master? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22880: [SPARK-25407][SQL] Ensure we pass a compatible pr...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/22880#discussion_r233180347 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala --- @@ -130,8 +130,8 @@ private[parquet] class ParquetRowConverter( extends ParquetGroupConverter(updater) with Logging { assert( -parquetType.getFieldCount == catalystType.length, -s"""Field counts of the Parquet schema and the Catalyst schema don't match: +parquetType.getFieldCount <= catalystType.length, +s"""Field count of the Parquet schema is greater than the field count of the Catalyst schema: --- End diff -- Why do you ask? Is it for safety, clarity? My concern is around reducing complexity, but I'm not strictly against this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22880: [SPARK-25407][SQL] Ensure we pass a compatible pr...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/22880#discussion_r233179076 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala --- @@ -202,8 +204,12 @@ private[parquet] class ParquetRowConverter( override def start(): Unit = { var i = 0 -while (i < currentRow.numFields) { +while (i < fieldConverters.length) { fieldConverters(i).updater.start() + i += 1 +} +i = 0 +while (i < currentRow.numFields) { --- End diff -- Yes, but I think it's clearer this way. If @viirya has an opinion either way I'll take it as a "tie-breaker". --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22905: [SPARK-25894][SQL] Add a ColumnarFileFormat type ...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/22905#discussion_r233175025 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -306,7 +306,15 @@ case class FileSourceScanExec( withOptPartitionCount } -withSelectedBucketsCount +val withOptColumnCount = relation.fileFormat match { + case columnar: ColumnarFileFormat => +val sqlConf = relation.sparkSession.sessionState.conf +val columnCount = columnar.columnCountForSchema(sqlConf, requiredSchema) +withSelectedBucketsCount + ("ColumnCount" -> columnCount.toString) --- End diff -- I'll reiterate a sample use case: > Consider also the case of the beeline user connecting to a multiuser thriftserver. They are pretty far from the log file, whereas running an 'explain' is right there in the terminal. This also matters to users planning/debugging queries in a Jupyter notebook, as we have in VideoAmp. The LOE for these users to go to a driver log file is quite high by comparison to inspecting a query plan. When you refer to logging, which log are you referring to? When would this information be logged? And at what log level? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22880: [SPARK-25407][SQL] Ensure we pass a compatible pruned sc...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/22880 @gatorsmile How do you feel about merging this in? Anyone else I should ping for review? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22880: [SPARK-25407][SQL] Ensure we pass a compatible pruned sc...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/22880 Can someone with Jenkins retest privileges please kick off a retest? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22880: [SPARK-25407][SQL] Ensure we pass a compatible pruned sc...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/22880 Jenkins retest please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22880: [SPARK-25407][SQL] Ensure we pass a compatible pr...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/22880#discussion_r231249401 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala --- @@ -202,11 +204,15 @@ private[parquet] class ParquetRowConverter( override def start(): Unit = { var i = 0 -while (i < currentRow.numFields) { +while (i < fieldConverters.length) { fieldConverters(i).updater.start() currentRow.setNullAt(i) --- End diff -- > I'm going to push a new commit keeping the current code but with a brief explanatory comment. On further careful consideration, I believe that separating the calls to `currentRow.setNullAt(i)` into their own loop actually won't incur any significant performance degradationâif any at all. The performance of the `start()` method is dominated by the calls to `fieldConverters(i).updater.start()` and `currentRow.setNullAt(i)`. Putting the latter calls into their own loop won't change the count of those method calls, just the order. @viirya LMK if you disagree with my analysis. I will push a new commit with separate while loops. I won't use the more elegant `(0 until currentRow.numFields).foreach(currentRow.setNullAt)` because that's not a loop, and I doubt either the Spark or Hotspot optimizer can turn that into a loop. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22880: [SPARK-25407][SQL] Ensure we pass a compatible pr...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/22880#discussion_r231243760 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala --- @@ -202,11 +204,15 @@ private[parquet] class ParquetRowConverter( override def start(): Unit = { var i = 0 -while (i < currentRow.numFields) { +while (i < fieldConverters.length) { fieldConverters(i).updater.start() currentRow.setNullAt(i) --- End diff -- Thank you both for your feedback. > Seems It can save some redundant iterations. That was my motivation in writing the code this way. While the code is not as clear as it could be, it is very performance critical. I'm going to push a new commit keeping the current code but with a brief explanatory comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22880: [SPARK-25407][SQL] Ensure we pass a compatible pruned sc...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/22880 cc @HyukjinKwon Would you like to review this PR? It's a bug fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22880: [SPARK-25407][SQL] Ensure we pass a compatible pr...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/22880#discussion_r230916138 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala --- @@ -202,11 +204,15 @@ private[parquet] class ParquetRowConverter( override def start(): Unit = { var i = 0 -while (i < currentRow.numFields) { +while (i < fieldConverters.length) { fieldConverters(i).updater.start() currentRow.setNullAt(i) --- End diff -- @viirya @attilapiros Hi guys. Does my explanation make sense? If so, do you want me to change the code as I suggested or leave it as-is in the current PR commit? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22905: [SPARK-25894][SQL] Add a ColumnarFileFormat type ...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/22905#discussion_r230914377 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -306,7 +306,15 @@ case class FileSourceScanExec( withOptPartitionCount } -withSelectedBucketsCount +val withOptColumnCount = relation.fileFormat match { + case columnar: ColumnarFileFormat => +val sqlConf = relation.sparkSession.sessionState.conf +val columnCount = columnar.columnCountForSchema(sqlConf, requiredSchema) +withSelectedBucketsCount + ("ColumnCount" -> columnCount.toString) --- End diff -- > In that way, all other logs should be put in metadata. Who wants that? If someone wants to put metadata somewhere in the physical plan, let them open a PR and make a case for it. Otherwise, why would we put things into the plan metadata that no one has asked for? That does not make sense. > For instance, we're not even showing the actual filters (not cayalyst but I mean the actual pushed filters that are going to apply at each source implementation level such as filters from ParquetFilters.createFilter) in Spark side. That's right, however that's not the aim of this PR. The aim is to ensure that plan-time schema pruning is working as expected, and I assert that that information itself is valuable enough to warrant inclusion in the data source metadata. That's speaking from experience, not conjecture. This metadata has been a _very valuable_ tool for us, and incurs little burden in practice. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22905: [SPARK-25894][SQL] Add a ColumnarFileFormat type which r...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/22905 > @mallman Could you run the EXPLAIN with this new changes and post it in the PR description? Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22905: [SPARK-25894][SQL] Add a ColumnarFileFormat type ...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/22905#discussion_r230442852 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ColumnarFileFormat.scala --- @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +/** + * An optional mix-in for columnar [[FileFormat]]s. This trait provides some helpful metadata when + * debugging a physical query plan. + */ +private[sql] trait ColumnarFileFormat { --- End diff -- > Mind changing it to private[datasources]? I changed it to `private[execution]` because otherwise it's inaccessible to `FileSourceScanExec` (which is in the `execution` package). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22905: [SPARK-25894][SQL] Add a ColumnarFileFormat type ...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/22905#discussion_r230433746 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ColumnarFileFormat.scala --- @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +/** + * An optional mix-in for columnar [[FileFormat]]s. This trait provides some helpful metadata when + * debugging a physical query plan. + */ +private[sql] trait ColumnarFileFormat { --- End diff -- > The purpose of this info - is this purpose to check if the columns are actually being pruned or not? Exactly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22905: [SPARK-25894][SQL] Add a ColumnarFileFormat type ...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/22905#discussion_r230432336 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -306,7 +306,15 @@ case class FileSourceScanExec( withOptPartitionCount } -withSelectedBucketsCount +val withOptColumnCount = relation.fileFormat match { + case columnar: ColumnarFileFormat => +val sqlConf = relation.sparkSession.sessionState.conf +val columnCount = columnar.columnCountForSchema(sqlConf, requiredSchema) +withSelectedBucketsCount + ("ColumnCount" -> columnCount.toString) --- End diff -- > The purpose of this info is to check the number of columns actually selected, and that information can be shown via logging, no? Why should it be exposed in metadata then? Yes, this information can be shown in logging, but I would say that getting access to the right log and identifying the right message is quite a bit more effort. Consider also the case of the beeline user connecting to a multiuser thriftserver. They are pretty far from the log file, whereas running an 'explain' is right there in the terminal. I think setting a log level to get this information is a bit overkill. You're going to opt in to a lot of information you don't want. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22905: [SPARK-25894][SQL] Add a ColumnarFileFormat type ...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/22905#discussion_r230128199 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ColumnarFileFormat.scala --- @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +/** + * An optional mix-in for columnar [[FileFormat]]s. This trait provides some helpful metadata when + * debugging a physical query plan. + */ +private[sql] trait ColumnarFileFormat { --- End diff -- This is not meant to be exposed as an external interface for outside data sources. In fact, in making it private I intended to create something that's intentionally hidden. That it could be used more generally is possible, but I'm just looking for an abstraction with the lightest footprint that will allow its use in `FileSourceScanExec` without referencing `ParquetFileFormat` there. To reference a specific file format in `FileSourceScanExec` seems totally inappropriate. I'm hoping @dongjoon-hyun can offer his opinion on whether this can be generalized to the ORC file format. I understand that I'm making an assumption about our ability for other file formats to adopt this interface. Another purpose in making this interface private is that it makes it easy to modify to support other implementations if necessary. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22905: [SPARK-25894][SQL] Add a ColumnarFileFormat type ...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/22905#discussion_r230125133 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -306,7 +306,15 @@ case class FileSourceScanExec( withOptPartitionCount } -withSelectedBucketsCount +val withOptColumnCount = relation.fileFormat match { + case columnar: ColumnarFileFormat => +val sqlConf = relation.sparkSession.sessionState.conf +val columnCount = columnar.columnCountForSchema(sqlConf, requiredSchema) +withSelectedBucketsCount + ("ColumnCount" -> columnCount.toString) --- End diff -- > logging should be good enough What's your basis for this assertion? Also, what kind of logging are you suggesting? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22880: [SPARK-25407][SQL] Ensure we pass a compatible pr...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/22880#discussion_r229743035 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala --- @@ -202,11 +204,15 @@ private[parquet] class ParquetRowConverter( override def start(): Unit = { var i = 0 -while (i < currentRow.numFields) { +while (i < fieldConverters.length) { fieldConverters(i).updater.start() currentRow.setNullAt(i) --- End diff -- Ok, sorry. Those links aren't coming up as code snippets. I'm following the instructions on https://help.github.com/articles/creating-a-permanent-link-to-a-code-snippet/. If anyone knows what I'm doing wrong, LMK and I'll fix it. Oddly, I did the same thing in https://github.com/apache/spark/pull/22880#discussion_r229654276 and it worked. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22880: [SPARK-25407][SQL] Ensure we pass a compatible pr...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/22880#discussion_r229739407 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala --- @@ -202,11 +204,15 @@ private[parquet] class ParquetRowConverter( override def start(): Unit = { var i = 0 -while (i < currentRow.numFields) { +while (i < fieldConverters.length) { fieldConverters(i).updater.start() currentRow.setNullAt(i) --- End diff -- Ehhh... the links in the previous post didn't come through as code blocks... Don't know why... Let me try to fix that... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22880: [SPARK-25407][SQL] Ensure we pass a compatible pr...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/22880#discussion_r229738879 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala --- @@ -202,11 +204,15 @@ private[parquet] class ParquetRowConverter( override def start(): Unit = { var i = 0 -while (i < currentRow.numFields) { +while (i < fieldConverters.length) { fieldConverters(i).updater.start() currentRow.setNullAt(i) --- End diff -- I can see how this is confusing. As part of the `start` method, all columns in the current row must be set to null. Some of those columns are set to null in https://github.com/apache/spark/blob/6b19f579e5424b5a8c16d6817c5a59b9828efec2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala#L207-L211 The rest of them are set to null in https://github.com/apache/spark/blob/6b19f579e5424b5a8c16d6817c5a59b9828efec2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala#L212-L215 This is equivalent to ```scala var i = 0 while (i < fieldConverters.length) { fieldConverters(i).updater.start() i += 1 } var j = 0 while (j < currentRow.numFields) { currentRow.setNullAt(j) j += 1 } ``` Is that clearer? Maybe I should rewrite the code that way. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22905: [SPARK-25894][SQL] Add a ColumnarFileFormat type which r...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/22905 > is there anything blocked by this? I agree this is a good feature, but it asks the data source to provide a new ability, which may become a problem when migrating file sources to data source v2. This isn't blocking anything. It's just a contribution that's shown itself to be very helpful for us identifying the source of performance problems in past experience. I think it would be helpful for others, too. That being said, I don't know enough about what would be involved in migrating file sources to data source v2 to say how difficult that would be. This implementation (for Parquet) is essentially a one-liner. All the heavy lifting is done by `SparkToParquetSchemaConverter`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22905: [SPARK-25894][SQL] Add a ColumnarFileFormat type ...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/22905#discussion_r229729687 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -306,7 +306,15 @@ case class FileSourceScanExec( withOptPartitionCount } -withSelectedBucketsCount +val withOptColumnCount = relation.fileFormat match { + case columnar: ColumnarFileFormat => +val sqlConf = relation.sparkSession.sessionState.conf +val columnCount = columnar.columnCountForSchema(sqlConf, requiredSchema) +withSelectedBucketsCount + ("ColumnCount" -> columnCount.toString) --- End diff -- You can "guess-timate" the physical column count by counting the leaf fields in the `ReadSchema` metadata value, but the true answer is an implementation issue of the file format. For example, in the implementation of `ColumnarFileFormat` for Parquet, we convert the the Catalyst schema to the Parquet schema before counting columns. I suppose a similar approach would be required for ORC and other columnar formats. That being said, this new metadata value isn't really meant to provide new and essential information, _per se_. Its purpose is to provide easy-to-read, practical information that's useful for quickly validating that schema pruning is working as expected. For example, seeing that a query is reading all 423 columns from a table instead of 15 tells us pretty quickly that schema pruning is not working (unless we really are trying to read the entire table schema). I've found the `ReadSchema` value to be difficult to read in practice because of its terse syntax, and because its printout is truncated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22905: [SPARK-25894][SQL] Add a ColumnarFileFormat type which r...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/22905 @gatorsmile @viirya @cloud-fan @dbtsai your thoughts? cc @dongjoon-hyun for ORC file format perspective. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22905: [SPARK-25894][SQL] Add a ColumnarFileFormat type ...
GitHub user mallman opened a pull request: https://github.com/apache/spark/pull/22905 [SPARK-25894][SQL] Add a ColumnarFileFormat type which returns the column count for a given schema (link to Jira: https://issues.apache.org/jira/browse/SPARK-25894) ## What changes were proposed in this pull request? Knowing the number of physical columns Spark will read from a columnar file format (such as Parquet) is extremely helpful (if not critical) in validating an assumption about that number of columns based on a given query. For example, take a `contacts` table with a `name` struct column like `(name.first, name.last)`. Without schema pruning the following query reads both columns in the name struct: ``` select name.first from contacts ``` With schema pruning, the same query reads only the `name.first` column. This PR includes an additional metadata field for `FileSourceScanExec` which identifies the number of columns Spark will read from that file source. This metadata will be printed as part of a physical plan explanation. ## How was this patch tested? A new test was added to `ParquetSchemaPruningSuite.scala`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/VideoAmp/spark-public spark-25894-file_source_scan_exec_column_count_metadata Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22905.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22905 commit 4aa8d0454be723f8318e1d0a3ea4e4c138ed5861 Author: Michael Allman Date: 2018-10-31T12:27:00Z Add a ColumnarFileFormat type and implementation for ParquetFileFormat which specifies a method for returning the physical column count associated with a given StructType. We include this count as metadata in FileSourceScanExec --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22880: [SPARK-25407][SQL] Ensure we pass a compatible pr...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/22880#discussion_r229654276 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala --- @@ -202,11 +204,15 @@ private[parquet] class ParquetRowConverter( override def start(): Unit = { var i = 0 -while (i < currentRow.numFields) { +while (i < fieldConverters.length) { fieldConverters(i).updater.start() currentRow.setNullAt(i) --- End diff -- Yes. The following while loop at https://github.com/apache/spark/blob/6b19f579e5424b5a8c16d6817c5a59b9828efec2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala#L212-L215 ensures all remaining columns/fields in the current row are nulled-out. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22880: [SPARK-25407][SQL] Ensure we pass a compatible pr...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/22880#discussion_r229451788 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala --- @@ -202,11 +204,15 @@ private[parquet] class ParquetRowConverter( override def start(): Unit = { var i = 0 -while (i < currentRow.numFields) { +while (i < fieldConverters.length) { fieldConverters(i).updater.start() currentRow.setNullAt(i) --- End diff -- That is correct. Now that we're passing a Parquet schema that's a (non-strict) subset of the Catalyst schema, we cannot assume that their fields are in 1:1 correspondence. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22880: [SPARK-25407][SQL] Ensure we pass a compatible pr...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/22880#discussion_r229451108 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala --- @@ -182,18 +182,20 @@ private[parquet] class ParquetRowConverter( // Converters for each field. private val fieldConverters: Array[Converter with HasParentContainerUpdater] = { -parquetType.getFields.asScala.zip(catalystType).zipWithIndex.map { - case ((parquetFieldType, catalystField), ordinal) => -// Converted field value should be set to the `ordinal`-th cell of `currentRow` -newConverter(parquetFieldType, catalystField.dataType, new RowUpdater(currentRow, ordinal)) +parquetType.getFields.asScala.map { + case parquetField => --- End diff -- You're right. I will include this change in a future push to this branch. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22880: [SPARK-25407][SQL] Ensure we pass a compatible pr...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/22880#discussion_r229450720 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala --- @@ -93,13 +141,14 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone]) log.debug(s"Preparing for read Parquet file with message type: $fileSchema") val parquetRequestedSchema = readContext.getRequestedSchema -logInfo { - s"""Going to read the following fields from the Parquet file: - | - |Parquet form: +log.info { + s"""Going to read the following fields from the Parquet file with the following schema: + |Parquet file schema: + |$fileSchema + |Parquet read schema: --- End diff -- This detailed, formatted information was very helpful in developing and debugging this patch. Perhaps this should be logged at the debug level instead? Even the original message does seem rather technical for info-level logging. What do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22880: [SPARK-25407][SQL] Ensure we pass a compatible pr...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/22880#discussion_r229449812 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala --- @@ -49,34 +49,82 @@ import org.apache.spark.sql.types._ * Due to this reason, we no longer rely on [[ReadContext]] to pass requested schema from [[init()]] * to [[prepareForRead()]], but use a private `var` for simplicity. */ -private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone]) +private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone], + usingVectorizedReader: Boolean) extends ReadSupport[UnsafeRow] with Logging { private var catalystRequestedSchema: StructType = _ def this() { // We need a zero-arg constructor for SpecificParquetRecordReaderBase. But that is only // used in the vectorized reader, where we get the convertTz value directly, and the value here // is ignored. -this(None) +this(None, usingVectorizedReader = true) } /** * Called on executor side before [[prepareForRead()]] and instantiating actual Parquet record * readers. Responsible for figuring out Parquet requested schema used for column pruning. */ override def init(context: InitContext): ReadContext = { +val conf = context.getConfiguration catalystRequestedSchema = { - val conf = context.getConfiguration val schemaString = conf.get(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA) assert(schemaString != null, "Parquet requested schema not set.") StructType.fromString(schemaString) } -val caseSensitive = context.getConfiguration.getBoolean(SQLConf.CASE_SENSITIVE.key, +val schemaPruningEnabled = conf.getBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key, + SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.defaultValue.get) +val caseSensitive = conf.getBoolean(SQLConf.CASE_SENSITIVE.key, SQLConf.CASE_SENSITIVE.defaultValue.get) -val parquetRequestedSchema = ParquetReadSupport.clipParquetSchema( - context.getFileSchema, catalystRequestedSchema, caseSensitive) - +val parquetFileSchema = context.getFileSchema +val parquetClippedSchema = ParquetReadSupport.clipParquetSchema(parquetFileSchema, + catalystRequestedSchema, caseSensitive) + +// As part of schema clipping, we add fields in catalystRequestedSchema which are missing +// from parquetFileSchema to parquetClippedSchema. However, nested schema pruning requires +// we ignore unrequested field data when reading from a Parquet file. Therefore we pass two +// schema to ParquetRecordMaterializer: the schema of the file data we want to read +// (parquetRequestedSchema), and the schema of the rows we want to return +// (catalystRequestedSchema). The reader is responsible for reconciling the differences between +// the two. +// +// Aside from checking whether schema pruning is enabled (schemaPruningEnabled), there +// is an additional complication to constructing parquetRequestedSchema. The manner in which +// Spark's two Parquet readers reconcile the differences between parquetRequestedSchema and +// catalystRequestedSchema differ. Spark's vectorized reader does not (currently) support +// reading Parquet files with complex types in their schema. Further, it assumes that +// parquetRequestedSchema includes all fields requested in catalystRequestedSchema. It includes +// logic in its read path to skip fields in parquetRequestedSchema which are not present in the +// file. +// +// Spark's parquet-mr based reader supports reading Parquet files of any kind of complex +// schema, and it supports nested schema pruning as well. Unlike the vectorized reader, the +// parquet-mr reader requires that parquetRequestedSchema include only those fields present in +// the underlying parquetFileSchema. Therefore, in the case where we use the parquet-mr reader +// we intersect the parquetClippedSchema with the parquetFileSchema to construct the +// parquetRequestedSchema set in the ReadContext. --- End diff -- > For vectorized reader, even we do this additional `intersectParquetGroups`, will it cause any problem? Yes. The relevant passage being ``` Further, [the vectorized reader] assumes that parquetRequestedSchema includes all fields requested in catalystRequestedSchema. It includes logic in its read path to skip fields in parquetRequestedSchema which are not present in the file. ``` If we break this assumption by giving the
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 > https://issues.apache.org/jira/browse/SPARK-25879 > > If we select a nested field and a top level field, the schema pruning will fail. Here is the reproducible test, > ... Hi @dbtsai. I believe the problem you're seeing here is resolved by #22880 (https://issues.apache.org/jira/browse/SPARK-25407). It was a known problem at the time this PR was merged, but was pushed back to a future commit. Coincidentally, I just posted #22880 today. The test case you provide is very similar to the test case introduced and exercised in that PR. I manually ran your test case on that branch locally, and the test passed. Would you like to try that branch and comment? Cheers. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22880: [SPARK-25407][SQL] Ensure we pass a compatible pr...
GitHub user mallman opened a pull request: https://github.com/apache/spark/pull/22880 [SPARK-25407][SQL] Ensure we pass a compatible pruned schema to ParquetRowConverter ## What changes were proposed in this pull request? (Link to Jira issue: https://issues.apache.org/jira/browse/SPARK-25407) As part of schema clipping in `ParquetReadSupport.scala`, we add fields in the Catalyst requested schema which are missing from the Parquet file schema to the Parquet clipped schema. However, nested schema pruning requires we ignore unrequested field data when reading from a Parquet file. Therefore we pass two schema to `ParquetRecordMaterializer`: the schema of the file data we want to read and the schema of the rows we want to return. The reader is responsible for reconciling the differences between the two. Aside from checking whether schema pruning is enabled, there is an additional complication to constructing the Parquet requested schema. The manner in which Spark's two Parquet readers reconcile the differences between the Parquet requested schema and the Catalyst requested schema differ. Spark's vectorized reader does not (currently) support reading Parquet files with complex types in their schema. Further, it assumes that the Parquet requested schema includes all fields requested in the Catalyst requested schema. It includes logic in its read path to skip fields in the Parquet requested schema which are not present in the file. Spark's parquet-mr based reader supports reading Parquet files of any kind of complex schema, and it supports nested schema pruning as well. Unlike the vectorized reader, the parquet-mr reader requires that the Parquet requested schema include only those fields present in the underlying Parquet file's schema. Therefore, in the case where we use the parquet-mr reader we intersect the Parquet clipped schema with the Parquet file's schema to construct the Parquet requested schema that's set in the `ReadContext`. ## How was this patch tested? A previously ignored test case which exercises the failure scenario this PR addresses has been enabled. You can merge this pull request into a Git repository by running: $ git pull https://github.com/VideoAmp/spark-public spark-25407-parquet_column_pruning-fix_ignored_pruning_test Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22880.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22880 commit e5e60ad2d9c130050925220eb4ae93ae3c949e95 Author: Michael Allman Date: 2018-08-15T23:48:25Z Ensure we pass a compatible pruned schema to ParquetRowConverter --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19410: [SPARK-22184][CORE][GRAPHX] GraphX fails in case of insu...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/19410 Hi @szhem. I'm sorry I haven't been more responsive here. I can relate to your frustration, and I do want to help you make progress on this PR and merge it in. I have indeed been busy with other responsibilities, but I can rededicate time to reviewing this PR. Of all the approaches you've proposed so far, I like the `ContextCleaner`-based one the best. Personally, I'm okay with setting `spark.cleaner.referenceTracking.cleanCheckpoints` to `true` by default for the next major Spark release and documenting this change of behavior in the release notes. However, that may not be okay with the senior maintainers. As an alternative I wonder if we could instead create a new config just for graph RDD checkpoint cleaning such as `spark.cleaner.referenceTracking.cleanGraphCheckpoints` and set that to `true` by default. Then use that config value in `PeriodicGraphCheckpointer` instead of `spark.cleaner.referenceTracking.cleanCheckpoints`. Would you be willing to open another PR with your `ContextCleaner`-based approach? I'm not suggesting you close this PR. We can call each PR alternative solutions for the same JIRA ticket and cross-reference each PR. If you do that then I will try to debug the problem with the retained checkpoints. Thank you. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22614: [SPARK-25561][SQL] Implement a new config to cont...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/22614#discussion_r223441744 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala --- @@ -79,12 +82,30 @@ class HiveClientSuite(version: String) client = init(true) } - test(s"getPartitionsByFilter returns all partitions when $tryDirectSqlKey=false") { -val client = init(false) -val filteredPartitions = client.getPartitionsByFilter(client.getTable("default", "test"), - Seq(attr("ds") === 20170101)) + test(s"getPartitionsByFilter returns all partitions when $partPruningFallbackKey=true") { + withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FALLBACK_ENABLED.key -> "true", +SQLConf.HIVE_METASTORE_PARTITION_PRUNING.key -> "true") { + val client = init(false) + // tryDirectSql = false and a non-string partition filter will always fail. This condition + // is used to test if the fallback works + val filteredPartitions = client.getPartitionsByFilter(client.getTable("default", "test"), +Seq(attr("ds") === 20170101)) -assert(filteredPartitions.size == testPartitionCount) + assert(filteredPartitions.size == testPartitionCount) +} + } + + test(s"getPartitionsByFilter should throw an exception if $partPruningFallbackKey=false") { --- End diff -- The test name states that `getPartitionsByFilter` should throw an exception if partition pruning fallback is disabled. But that's not right. I think we need an accurate name for this and the previous test. Perhaps it should include a mention that the underlying call to the metastore throws an exception. How about ``` s"getPartitionsByFilter should throw an exception if the underlying call to the metastore throws an exception and $partPruningFallbackKey=false" ``` ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22614: [SPARK-25561][SQL] Implement a new config to cont...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/22614#discussion_r223425011 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala --- @@ -79,12 +82,30 @@ class HiveClientSuite(version: String) client = init(true) } - test(s"getPartitionsByFilter returns all partitions when $tryDirectSqlKey=false") { -val client = init(false) -val filteredPartitions = client.getPartitionsByFilter(client.getTable("default", "test"), - Seq(attr("ds") === 20170101)) + test(s"getPartitionsByFilter returns all partitions when $partPruningFallbackKey=true") { + withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FALLBACK_ENABLED.key -> "true", +SQLConf.HIVE_METASTORE_PARTITION_PRUNING.key -> "true") { + val client = init(false) + // tryDirectSql = false and a non-string partition filter will always fail. This condition + // is used to test if the fallback works + val filteredPartitions = client.getPartitionsByFilter(client.getTable("default", "test"), +Seq(attr("ds") === 20170101)) -assert(filteredPartitions.size == testPartitionCount) + assert(filteredPartitions.size == testPartitionCount) +} + } + + test(s"getPartitionsByFilter should throw an exception if $partPruningFallbackKey=false") { --- End diff -- Change test name to ``` s"getPartitionsByFilter should throw an exception when $tryDirectSqlKey=false and $partPruningFallbackKey=false" ``` ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22614: [SPARK-25561][SQL] Implement a new config to cont...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/22614#discussion_r223424625 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala --- @@ -79,12 +82,30 @@ class HiveClientSuite(version: String) client = init(true) } - test(s"getPartitionsByFilter returns all partitions when $tryDirectSqlKey=false") { -val client = init(false) -val filteredPartitions = client.getPartitionsByFilter(client.getTable("default", "test"), - Seq(attr("ds") === 20170101)) + test(s"getPartitionsByFilter returns all partitions when $partPruningFallbackKey=true") { --- End diff -- Change test name to ``` s"getPartitionsByFilter returns all partitions when $tryDirectSqlKey=false and $partPruningFallbackKey=true" ``` ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22614: [SPARK-25561][SQL] Implement a new config to cont...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/22614#discussion_r223422030 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala --- @@ -746,34 +746,45 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] } else { logDebug(s"Hive metastore filter is '$filter'.") +val shouldFallback = SQLConf.get.metastorePartitionPruningFallback val tryDirectSqlConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL // We should get this config value from the metaStore. otherwise hit SPARK-18681. // To be compatible with hive-0.12 and hive-0.13, In the future we can achieve this by: // val tryDirectSql = hive.getMetaConf(tryDirectSqlConfVar.varname).toBoolean val tryDirectSql = hive.getMSC.getConfigValue(tryDirectSqlConfVar.varname, tryDirectSqlConfVar.defaultBoolVal.toString).toBoolean try { - // Hive may throw an exception when calling this method in some circumstances, such as - // when filtering on a non-string partition column when the hive config key - // hive.metastore.try.direct.sql is false getPartitionsByFilterMethod.invoke(hive, table, filter) .asInstanceOf[JArrayList[Partition]] } catch { - case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && - !tryDirectSql => -logWarning("Caught Hive MetaException attempting to get partition metadata by " + - "filter from Hive. Falling back to fetching all partition metadata, which will " + - "degrade performance. Modifying your Hive metastore configuration to set " + - s"${tryDirectSqlConfVar.varname} to true may resolve this problem.", ex) -// HiveShim clients are expected to handle a superset of the requested partitions -getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] - case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && - tryDirectSql => -throw new RuntimeException("Caught Hive MetaException attempting to get partition " + - "metadata by filter from Hive. You can set the Spark configuration setting " + - s"${SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key} to false to work around this " + - "problem, however this will result in degraded performance. Please report a bug: " + - "https://issues.apache.org/jira/browse/SPARK";, ex) + case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] => +if (shouldFallback) { + if (!tryDirectSql) { +logWarning("Caught Hive MetaException attempting to get partition metadata by " + + "filter from Hive. Falling back to fetching all partition metadata, which will " + + "degrade performance. Modifying your Hive metastore configuration to set " + + s"${tryDirectSqlConfVar.varname} to true may resolve this problem.") + } else { +logWarning("Caught Hive MetaException attempting to get partition metadata " + + "by filter from Hive. Hive metastore's direct SQL feature has been enabled, " + + "but it is an optimistic optimization and not guaranteed to work. Falling back " + + "to fetching all partition metadata, which will degrade performance (for the " + + "current query). If you see this error consistently, you can set the Spark " + + s"configuration setting ${SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key} to " + + "false as a work around, however this will result in degraded performance. " + + "Please report a bug to Hive stating that direct SQL is failing consistently " + + "for the specified query: https://issues.apache.org/jira/browse/HIVE";) --- End diff -- I think we should remove the suggestion to file a Hive project bug. Even with the direct SQL configuration setting enabled, there are valid metastore deployments for which it will be ignored. For example, my understanding is that if the metastore uses MongoDB for its underlying storage, the direct SQL configurati
[GitHub] spark pull request #22614: [SPARK-25561][SQL] Implement a new config to cont...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/22614#discussion_r223415835 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala --- @@ -754,26 +755,38 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { tryDirectSqlConfVar.defaultBoolVal.toString).toBoolean try { // Hive may throw an exception when calling this method in some circumstances, such as - // when filtering on a non-string partition column when the hive config key - // hive.metastore.try.direct.sql is false + // when filtering on a non-string partition column. --- End diff -- To me your revised comment suggests that if we try to filter on a non-string partition column we can expect the call to `getPartitionedByFilter` to fail. That's not true. Although I wrote the original comment, I can see how making an assumption about Hive's behavior when calling this method is rather specious and unwise. I suggest we just remove this comment entirely. To simply state that ``` Hive may throw an exception when calling this method in some circumstances. ``` only states the obviousâof course any method call may throw an exception in some circumstances. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22614: [SPARK-25561][SQL] HiveClient.getPartitionsByFilt...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/22614#discussion_r222348674 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala --- @@ -746,34 +746,20 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] } else { logDebug(s"Hive metastore filter is '$filter'.") -val tryDirectSqlConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL -// We should get this config value from the metaStore. otherwise hit SPARK-18681. -// To be compatible with hive-0.12 and hive-0.13, In the future we can achieve this by: -// val tryDirectSql = hive.getMetaConf(tryDirectSqlConfVar.varname).toBoolean -val tryDirectSql = hive.getMSC.getConfigValue(tryDirectSqlConfVar.varname, - tryDirectSqlConfVar.defaultBoolVal.toString).toBoolean try { // Hive may throw an exception when calling this method in some circumstances, such as - // when filtering on a non-string partition column when the hive config key - // hive.metastore.try.direct.sql is false + // when filtering on a non-string partition column. getPartitionsByFilterMethod.invoke(hive, table, filter) .asInstanceOf[JArrayList[Partition]] } catch { - case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && - !tryDirectSql => + case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] => logWarning("Caught Hive MetaException attempting to get partition metadata by " + "filter from Hive. Falling back to fetching all partition metadata, which will " + - "degrade performance. Modifying your Hive metastore configuration to set " + - s"${tryDirectSqlConfVar.varname} to true may resolve this problem.", ex) + "degrade performance. Enable direct SQL mode in hive metastore to attempt " + + "to improve performance. However, Hive's direct SQL mode is an optimistic " + + "optimization and does not guarantee improved performance.") --- End diff -- > sometimes failing is better than spending several hours to get all partitions. Shall we add a config to switch the behavior? I think that's what `${SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS}` is for. @kmanamcheri, what happens if you set this to false? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22614: [SPARK-25561][SQL] HiveClient.getPartitionsByFilt...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/22614#discussion_r222345462 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala --- @@ -746,34 +746,20 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] } else { logDebug(s"Hive metastore filter is '$filter'.") -val tryDirectSqlConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL -// We should get this config value from the metaStore. otherwise hit SPARK-18681. -// To be compatible with hive-0.12 and hive-0.13, In the future we can achieve this by: -// val tryDirectSql = hive.getMetaConf(tryDirectSqlConfVar.varname).toBoolean -val tryDirectSql = hive.getMSC.getConfigValue(tryDirectSqlConfVar.varname, - tryDirectSqlConfVar.defaultBoolVal.toString).toBoolean try { // Hive may throw an exception when calling this method in some circumstances, such as - // when filtering on a non-string partition column when the hive config key - // hive.metastore.try.direct.sql is false + // when filtering on a non-string partition column. getPartitionsByFilterMethod.invoke(hive, table, filter) .asInstanceOf[JArrayList[Partition]] } catch { - case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && - !tryDirectSql => + case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] => logWarning("Caught Hive MetaException attempting to get partition metadata by " + "filter from Hive. Falling back to fetching all partition metadata, which will " + - "degrade performance. Modifying your Hive metastore configuration to set " + - s"${tryDirectSqlConfVar.varname} to true may resolve this problem.", ex) + "degrade performance. Enable direct SQL mode in hive metastore to attempt " + + "to improve performance. However, Hive's direct SQL mode is an optimistic " + + "optimization and does not guarantee improved performance.") --- End diff -- I think the original warning message is more accurate. Direct sql mode isn't just about performance. It's also about enhanced capability, e.g. supporting filtering on non-string type columns. As the original comment states, setting the direct sql config value to true may resolve a problem around metastore-side partition filtering. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 Hi @Gauravshah. That branch has diverged substantially from whatâs in master. Right now Iâm preparing a PR to address a problem with the current implementation in master, but Iâm on holiday for a while. Still, I am hopeful we will see schema pruning for joins and aggregations in 3.0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22394: [SPARK-25406][SQL] For ParquetSchemaPruningSuite.scala, ...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/22394 Retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22394: [SPARK-25406][SQL] For ParquetSchemaPruningSuite.scala, ...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/22394 > Hey @mallman, let's just target to fix the problem in the JIRA without other refactorings. The refactorings I've made address the problem directly. Hopefully that will be clearer with my most recent commit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22394: [SPARK-25406][SQL] For ParquetSchemaPruningSuite....
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/22394#discussion_r217055207 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala --- @@ -245,28 +249,32 @@ class ParquetSchemaPruningSuite checkAnswer(query.orderBy("id"), Row(1) :: Nil) } - private def testMixedCasePruning(testName: String)(testThunk: => Unit) { -withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true", - SQLConf.CASE_SENSITIVE.key -> "true") { - test(s"Spark vectorized reader - case-sensitive parser - mixed-case schema - $testName") { - withMixedCaseData(testThunk) + private def testExactCasePruning(testName: String)(testThunk: => Unit) { +test(s"Spark vectorized reader - case-sensitive parser - mixed-case schema - $testName") { + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true", +SQLConf.CASE_SENSITIVE.key -> "true") { +withMixedCaseData(testThunk) } } -withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false", - SQLConf.CASE_SENSITIVE.key -> "false") { - test(s"Parquet-mr reader - case-insensitive parser - mixed-case schema - $testName") { +test(s"Parquet-mr reader - case-sensitive parser - mixed-case schema - $testName") { + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false", +SQLConf.CASE_SENSITIVE.key -> "true") { withMixedCaseData(testThunk) } } -withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true", - SQLConf.CASE_SENSITIVE.key -> "false") { - test(s"Spark vectorized reader - case-insensitive parser - mixed-case schema - $testName") { - withMixedCaseData(testThunk) +testMixedCasePruning(testName)(testThunk) + } + + private def testMixedCasePruning(testName: String)(testThunk: => Unit) { --- End diff -- Previously, this method ran `testThunk` with `SQLConf.CASE_SENSITIVE.key` set to true and false. That was a mistake and incorrect. For example, the query select col1, col2.b from mixedcase will fail if `SQLConf.CASE_SENSITIVE.key` is set to true. That mistake was causing 6 test cases to fail. Therefore, I moved the code that tests with a case-sensitive parser out of `testMixedCasePruning` into `testExactCasePruning` and included a call to `testMixedCasePruning` in `testExactCasePruning`. I'll push a commit that refactors the method names and add code comments that will make this clearer. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22394: [SPARK-25406][SQL] For ParquetSchemaPruningSuite....
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/22394#discussion_r217052036 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala --- @@ -245,28 +249,32 @@ class ParquetSchemaPruningSuite checkAnswer(query.orderBy("id"), Row(1) :: Nil) } - private def testMixedCasePruning(testName: String)(testThunk: => Unit) { -withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true", - SQLConf.CASE_SENSITIVE.key -> "true") { - test(s"Spark vectorized reader - case-sensitive parser - mixed-case schema - $testName") { - withMixedCaseData(testThunk) + private def testExactCasePruning(testName: String)(testThunk: => Unit) { --- End diff -- The method names are meant to clarify the kind of queries being tested, not the setting of `SQLConf.CASE_SENSITIVE.key`. In this case, `testExactCasePruning` is supposed to mean that we're passing in a test in which the column names in the query are exactly the same as the column names in the relation. It's not a very good name in that sense. I'll try to make it clearer and add a code comment to clarify. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22357: [SPARK-25363][SQL] Fix schema pruning in where clause by...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/22357 > FYI, @mallman I'm working on having ParquetFilter to support IsNotNull(employer.id) to be pushed into parquet reader. That would be pretty cool. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22357: [SPARK-25363][SQL] Fix schema pruning in where clause by...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/22357 And FYI this is the Jira issue I promised in https://github.com/apache/spark/pull/22357#issuecomment-419940228 yesterday: https://issues.apache.org/jira/browse/SPARK-25407. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22394: [SPARK-25406][SQL] For ParquetSchemaPruningSuite.scala, ...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/22394 FYI @viirya @dbtsai @gatorsmile @HyukjinKwon Can I get someone's review of this PR please? The unmasked failures appear to be false positives, so no changes to the tested code are requiredâjust changes to the tests themselves. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22357: [SPARK-25363][SQL] Fix schema pruning in where clause by...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/22357 This LGTM. I'm not going to submit a PR for my approach to this problem. Thanks @viirya! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22357: [SPARK-25363][SQL] Fix schema pruning in where cl...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/22357#discussion_r216714387 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala --- @@ -110,7 +110,17 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { val projectionRootFields = projects.flatMap(getRootFields) val filterRootFields = filters.flatMap(getRootFields) -(projectionRootFields ++ filterRootFields).distinct +// Kind of expressions don't need to access any fields of a root fields, e.g., `IsNotNull`. +// For them, if there are any nested fields accessed in the query, we don't need to add root +// field access of above expressions. +// For example, for a query `SELECT name.first FROM contacts WHERE name IS NOT NULL`, +// we don't need to read nested fields of `name` struct other than `first` field. --- End diff -- Yeah, I'm okay with leaving it as-is. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22394: [SPARK-25406][SQL] For ParquetSchemaPruningSuite.scala, ...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/22394 I'm working on fixing these test failures now. Hopefully I'll have something pushed soon. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22357: [SPARK-25363][SQL] Fix schema pruning in where clause by...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/22357 FYI, the PR I previously mentioned about fixing the use of `withSQLConf` is #22394. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22394: [SPARK-25406][SQL] For ParquetSchemaPruningSuite....
GitHub user mallman opened a pull request: https://github.com/apache/spark/pull/22394 [SPARK-25406][SQL] For ParquetSchemaPruningSuite.scala, move calls to `withSQLConf` inside calls to `test` (Link to Jira: https://issues.apache.org/jira/browse/SPARK-25406) ## What changes were proposed in this pull request? The current use of `withSQLConf` in `ParquetSchemaPruningSuite.scala` is incorrect. The desired configuration settings are not being set when running the test cases. This PR fixes that defective usage and addresses the test failures that were previously masked by that defect. ## How was this patch tested? I added code to relevant test cases to print the expected SQL configuration settings and found that the settings were not being set as expected. When I changed the order of calls to `test` and `withSQLConf` I found that the configuration settings were being set as expected. You can merge this pull request into a Git repository by running: $ git pull https://github.com/VideoAmp/spark-public spark-25406-fix_broken_schema_pruning_tests Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22394.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22394 commit 8cca76b7ec31c1c0cff1bd5b6772b43b49c30147 Author: Michael Allman Date: 2018-09-11T15:10:08Z For ParquetSchemaPruningSuite.scala, move calls to `withSQLConf` inside calls to `test` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22357: [SPARK-25363][SQL] Fix schema pruning in where clause by...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/22357 I have some bad news. The methods `testSchemaPruning` and `testMixedCasePruning` do not set the configuration settings as expected. Fixing that reveals 6 failing tests for the mixed case tests. One of those failing tests involves testing the scan and answer for a query involving a filter condition. Based on what I'm seeing, I think it's fair to say that schema pruning is broken under certain circumstances when using a table schema that includes column names with upper-case characters (note that the test schema for contacts in `ParquetSchemaPruningSuite.scala` includes no fields with upper-case characters). Fortunately schema pruning is disabled by default, and I think it's still considered "experimental" technology. I think that fixing `ParquetSchemaPruningSuite.scala` is pretty straightforward. Fixing the newly failing unit tests will be more effort. In any case, I will create an issue in Jira and submit a PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22357: [SPARK-25363][SQL] Fix schema pruning in where clause by...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/22357 @viirya Please amend https://github.com/apache/spark/blob/d684a0f30599d50061ef78ec62edcdd3b726e2d9/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala#L303-L306 to remove the explanatory comment, and uncomment the commented-out line of code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22357: [SPARK-25363][SQL] Fix schema pruning in where cl...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/22357#discussion_r216686762 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala --- @@ -155,6 +163,60 @@ class ParquetSchemaPruningSuite Row(null) :: Row(null) :: Nil) } + testSchemaPruning("select a single complex field and in where clause") { +val query1 = sql("select name.first from contacts where name.first = 'Jane'") +checkScan(query1, "struct>") +checkAnswer(query1, Row("Jane") :: Nil) + +val query2 = sql("select name.first, name.last from contacts where name.first = 'Jane'") +checkScan(query2, "struct>") +checkAnswer(query2, Row("Jane", "Doe") :: Nil) + +val query3 = sql("select name.first from contacts " + + "where employer.company.name = 'abc' and p = 1") +checkScan(query3, "struct," + + "employer:struct>>") +checkAnswer(query3, Row("Jane") :: Nil) + +val query4 = sql("select name.first, employer.company.name from contacts " + + "where employer.company is not null and p = 1") +checkScan(query4, "struct," + + "employer:struct>>") +checkAnswer(query4, Row("Jane", "abc") :: Nil) + } + + testSchemaPruning("select nullable complex field and having is null predicate") { --- End diff -- Do you mean `having is not null predicate`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22357: [SPARK-25363][SQL] Fix schema pruning in where cl...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/22357#discussion_r216683076 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala --- @@ -110,7 +110,17 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { val projectionRootFields = projects.flatMap(getRootFields) val filterRootFields = filters.flatMap(getRootFields) -(projectionRootFields ++ filterRootFields).distinct +// Kind of expressions don't need to access any fields of a root fields, e.g., `IsNotNull`. +// For them, if there are any nested fields accessed in the query, we don't need to add root +// field access of above expressions. +// For example, for a query `SELECT name.first FROM contacts WHERE name IS NOT NULL`, +// we don't need to read nested fields of `name` struct other than `first` field. --- End diff -- @viirya, I see your point about the difference between a complex type being null and a subfield being null. So to answer the following query select address from contacts where name is not null do we need to read any of the fields in `name`? Or perhaps just read one arbitrary field of simple type, like `name.first`? That's surprising, but I'm starting to believe it's true. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22357: [SPARK-25363][SQL] Fix schema pruning in where cl...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/22357#discussion_r216545091 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala --- @@ -110,7 +110,17 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { val projectionRootFields = projects.flatMap(getRootFields) val filterRootFields = filters.flatMap(getRootFields) -(projectionRootFields ++ filterRootFields).distinct +// Kind of expressions don't need to access any fields of a root fields, e.g., `IsNotNull`. +// For them, if there are any nested fields accessed in the query, we don't need to add root +// field access of above expressions. +// For example, for a query `SELECT name.first FROM contacts WHERE name IS NOT NULL`, +// we don't need to read nested fields of `name` struct other than `first` field. --- End diff -- I'm having trouble accepting this, but perhaps I'm reading too much into it (or not enough). Let me illustrate with a couple of queries and their physical plans. Assuming the data model in `ParquetSchemaPruningSuite.scala`, the physical plan for the query select employer.id from contacts where employer is not null is ``` == Physical Plan == *(1) Project [employer#36.id AS id#46] +- *(1) Filter isnotnull(employer#36) +- *(1) FileScan parquet [employer#36,p#37] Batched: false, Format: Parquet, PartitionCount: 2, PartitionFilters: [], PushedFilters: [IsNotNull(employer)], ReadSchema: struct> ``` The physical plan for the query select employer.id from contacts where employer.id is not null is ``` == Physical Plan == *(1) Project [employer#36.id AS id#47] +- *(1) Filter (isnotnull(employer#36) && isnotnull(employer#36.id)) +- *(1) FileScan parquet [employer#36,p#37] Batched: false, Format: Parquet, PartitionCount: 2, PartitionFilters: [], PushedFilters: [IsNotNull(employer)], ReadSchema: struct> ``` The read schemata are the same, but the query filters are not. The file scan for the second query looks as I would expect, but the scan for the first query appears to only read `employer.id` even though it needs to check `employer is not null`. If it only reads `employer.id`, how does it check that `employer.company` is not null? Perhaps `employer.id` is null but `employer.company` is not null for some row... I have run some tests to validate that this PR is returning the correct results for both queries, and it is. But I don't understand why. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22357: [SPARK-25363][SQL] Fix schema pruning in where clause by...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/22357 > FYI, per further checking code and discussion with @dbtsai regarding with predicate pushdown, we know that predicate push down only works for primitive types on Parquet datasource. So both `IsNotNull(employer)` and `IsNotNull(employer.id)` are not actually pushed down to work at Parquet reader I would expect `IsNotNull(employer.id)` to be pushed down. In any case, I misunderstood what that `PushedFilters` metadata item means in the `FileScan` part of the physical plan. I thought that was a Parquet filter, but sometimes it is not. In any case, I'm not concerned about supporting filter push down at this point. My concern is around its side effects, but that has been allayed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22357: [SPARK-25363][SQL] Fix schema pruning in where clause by...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/22357 > @mallman It will be great that we can have this fix in 2.4 release as this can dramatically reduce the data being read in many applications which is the purpose of the original work. I agree it would be great to have this capability in 2.4. But I don't know that this PR is the right way to accomplish our intended goal. I'm also not sure this patch accomplishes its intended goal. And I would like time to complete my reviewâI'm still running tests against this patch. I would also like to submit my patch as an alternative for review, because the approach made by this PR and by my patch are not compatible. Even though it's incomplete, I'm willing to submit it as-is with some notes on how it's incomplete and what needs to be done. However, I can say for certain there is no way it would be accepted for Spark 2.4. The earliest I could get it submitted is tomorrow morning (EDT). However, to give you a sense of how my patch works, I can give you the gist of how I see the problem. Basically, constraint propagation as defined in `QueryPlanConstraints.scala` inhibits schema pruning. Indeed, if you turn off constraint propagation (by setting `spark.sql.constraintPropagation.enabled` to `false`), the following query select employer.id from contacts where employer.id = 0 produces the following physical plan ``` == Physical Plan == *(1) Project [employer#36.id AS id#47] +- *(1) Filter (employer#36.id = 0) +- *(1) FileScan parquet [employer#36,p#37] Batched: false, Format: Parquet, PartitionCount: 2, PartitionFilters: [], PushedFilters: [], ReadSchema: struct> ``` without applying _either_ patch. (FYI I ran this on the master branch, commit 12e3e9f17dca11a2cddf0fb99d72b4b97517fb56). The only column read in this plan is `employer.id`, just as we'd like. Aside from the difference in approach, I have some other concerns around this PR. I don't think we should push down `IsNotNull(employer)` to the reader unless we need to. This PR includes that pushed down filter for both of the sample queries I provided in my previous comment https://github.com/apache/spark/pull/22357#issuecomment-419612555. The question isâhow does that pushdown affect the reader's behavior? That leads me to a concern around the testing of this functionality. Our intent is to read from as few columns as necessary. In the query select employer.id from contacts where employer.id = 0 we need only read from the `employer.id` column. And we can tell the reader to only read that column. But how do we know that pushing down `IsNotNull(employer)` does not negate that instruction? One way to be certain is to not push that filter down in the first place. That is the approach my patch currently takes. Of course, this removes the pushdown. I think that identifying which plan leads to a faster scan requires a more robust testing capability, however one thing is certain: the `FileScan` in my patch's plan gives no reason to believe that it is reading anything other than that one column. IMO, we can get closer to settling the question of relative performance/behavior by pushing down Parquet reader filters just for the columns we need, e.g. `IsNotNull(employer.id)` in this case above. Neither patch (currently) does that, however I think my patch is closer to achieving that because it already identifies `isnotnull(employer#4445.id)` as a filter predicate in the query plan. We just need to push it down. As I mentioned, I'll endeavor to have my patch posted as a PR by tomorrow morning, but I can't make a promise of that. I'm sorry for the delay. I really wasn't expecting we'd work on this functionality for Spark 2.4. We do have a known bug in the schema pruning functionality that's in Spark 2.4âone that throws an error. We identified it in #21320 (look for the "ignored" test in `ParquetSchemaPruningSuite.scala`), but I don't think we have an issue in Jira for it. I'll try to take care of that by tomorrow morning as well, and I was hoping we would prioritize that. I have a patch for that bug that is code complete but missing proper code documentation. Thanks all. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22357: [SPARK-25363][SQL] Fix schema pruning in where clause by...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/22357 I have reconstructed my original patch for this issue, but I've discovered it will require more work to complete. However, as part of that reconstruction I've discovered a couple of cases where our patches create different physical plans. The query results are the same, but I'm not sure whichâif eitherâplan is correct. I want to go into detail on that, but it's complicated and I have to call it quits tonight. I have a flight in the morning, and I'll be on break next week. In the meantime, I'll just copy and paste two queriesâbased on the data in `ParquetSchemaPruningSuite.scala`âwith two query plans each. First query: select employer.id from contacts where employer is not null This PR (as of d68f808) produces: ``` == Physical Plan == *(1) Project [employer#4442.id AS id#4452] +- *(1) Filter isnotnull(employer#4442) +- *(1) FileScan parquet [employer#4442,p#4443] Batched: false, Format: Parquet, PartitionCount: 2, PartitionFilters: [], PushedFilters: [IsNotNull(employer)], ReadSchema: struct> ``` My WIP patch produces: ``` == Physical Plan == *(1) Project [employer#4442.id AS id#4452] +- *(1) Filter isnotnull(employer#4442) +- *(1) FileScan parquet [employer#4442,p#4443] Batched: false, Format: Parquet, PartitionCount: 2, PartitionFilters: [], PushedFilters: [IsNotNull(employer)], ReadSchema: struct>> ``` Second query: select employer.id from contacts where employer.id = 0 This PR produces: ``` == Physical Plan == *(1) Project [employer#4297.id AS id#4308] +- *(1) Filter (isnotnull(employer#4297) && (employer#4297.id = 0)) +- *(1) FileScan parquet [employer#4297,p#4298] Batched: false, Format: Parquet, PartitionCount: 2, PartitionFilters: [], PushedFilters: [IsNotNull(employer)], ReadSchema: struct> ``` My WIP patch produces: ``` == Physical Plan == *(1) Project [employer#4445.id AS id#4456] +- *(1) Filter (isnotnull(employer#4445.id) && (employer#4445.id = 0)) +- *(1) FileScan parquet [employer#4445,p#4446] Batched: false, Format: Parquet, PartitionCount: 2, PartitionFilters: [], PushedFilters: [], ReadSchema: struct> ``` I wanted to give my thoughts on the differences of these in detail, but I have to wrap up my work for the night. I'll be visiting family next week. I don't know how responsive I'll be in that time, but I'll at least try to check back. Cheers. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22357: [SPARK-25363][SQL] Fix schema pruning in where clause by...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/22357 Hi @viirya, Thanks for this PR! I have an alternative implementation which I'd like to submit for comparison. My implementation was something I removed from my original patch. I hope to have my PR submitted sometime today. I have another PR to submit, too. I'll be sure to refer to your PR in mine. Cheers. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15673: [SPARK-17992][SQL] Return all partitions from Hiv...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/15673#discussion_r216037341 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala --- @@ -586,17 +587,31 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] } else { logDebug(s"Hive metastore filter is '$filter'.") +val tryDirectSqlConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL +val tryDirectSql = + hive.getConf.getBoolean(tryDirectSqlConfVar.varname, tryDirectSqlConfVar.defaultBoolVal) try { + // Hive may throw an exception when calling this method in some circumstances, such as + // when filtering on a non-string partition column when the hive config key + // hive.metastore.try.direct.sql is false getPartitionsByFilterMethod.invoke(hive, table, filter) .asInstanceOf[JArrayList[Partition]] } catch { - case e: InvocationTargetException => -// SPARK-18167 retry to investigate the flaky test. This should be reverted before -// the release is cut. -val retry = Try(getPartitionsByFilterMethod.invoke(hive, table, filter)) -logError("getPartitionsByFilter failed, retry success = " + retry.isSuccess) -logError("all partitions: " + getAllPartitions(hive, table)) -throw e + case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && + !tryDirectSql => +logWarning("Caught Hive MetaException attempting to get partition metadata by " + + "filter from Hive. Falling back to fetching all partition metadata, which will " + + "degrade performance. Modifying your Hive metastore configuration to set " + + s"${tryDirectSqlConfVar.varname} to true may resolve this problem.", ex) +// HiveShim clients are expected to handle a superset of the requested partitions +getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] + case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && + tryDirectSql => +throw new RuntimeException("Caught Hive MetaException attempting to get partition " + --- End diff -- Hi @rezasafi I believe the reasoning is if the user has disabled direct sql, we will try to fetch the partitions for the requested partition predicate anyway. However, since we don't expect that call to succeed, we just log a warning and fallback to the legacy behavior. On the other hand, if the user has enabled direct sql, then we expect the call to Hive to succeed. If it fails, we consider that an error and throw an exception. I hope that helps clarify things. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 Thanks everyone for your contributions, support and patience. It's been a journey and a half, and I'm excited for the future. I will open a follow-on PR to address the current known failure scenario (see ignored test) in this patch, and we can discuss if/how we can get it into 2.4 as well. I know there are many early adopters of this patch and #16578. Bug reports will continue to be very helpful. Beyond this patch, there are many possibilities for widening the scope of schema pruning. As part of our review process, we've pared the scope of this capability to just projection. IMHO, the first limitation we should address post 2.4 is supporting pruning with query filters of nested fields ("where" clauses). Joins, aggregations and window queries would be powerful enhancements as well, bringing the scope of schema pruning to analytic queries. I believe all of the additional features VideoAmp has implemented for schema pruning are independent of the underlying column store. Future enhancements should be automagically inherited by any column store that implements functionality analogous to `ParquetSchemaPruning.scala`. This should widen not just the audience that can be reached, but the developer community that can contribute and review. Thanks again. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 > @mallman Could you remove the changes made in ParquetRowConverter.scala and also turn off spark.sql.nestedSchemaPruning.enabled by default in this PR? Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r212396370 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala --- @@ -202,11 +204,15 @@ private[parquet] class ParquetRowConverter( override def start(): Unit = { var i = 0 -while (i < currentRow.numFields) { +while (i < fieldConverters.length) { fieldConverters(i).updater.start() currentRow.setNullAt(i) i += 1 } +while (i < currentRow.numFields) { --- End diff -- These changes are related to my fix for the ignored unit test. If I apply my fix but keep the master version of this file, 24 unit tests fail. If I apply my fix along with this file diff then all tests pass, including the test that is currently ignored. I'm not sure I can develop a unit test for this current commit that should pass but will fail without this file's changes. I haven't spent any time thinking about it, and I really need to work on other things right now. If you want I will back out this change. However, I will re-incorporate it in a follow-on PR. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r212388958 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala --- @@ -202,11 +204,15 @@ private[parquet] class ParquetRowConverter( override def start(): Unit = { var i = 0 -while (i < currentRow.numFields) { +while (i < fieldConverters.length) { fieldConverters(i).updater.start() currentRow.setNullAt(i) i += 1 } +while (i < currentRow.numFields) { --- End diff -- I'll get back to you on this shortly. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 @gatorsmile Any concerns about merging this PR at this point? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 @gatorsmile How does this look? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 > Add some test cases when turning on spark.sql.caseSensitive? Will do. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 > Try this when spark.sql.nestedSchemaPruning.enabled is on? I don't think this will be difficult to fix. I'm working on it now and will add relevant test coverage. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 > Try this when spark.sql.nestedSchemaPruning.enabled is on? This is a case-sensitivity issue (obviously). I'll get to the root of it. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 > I see no point of leaving this PR open. I don't agree with you on that point, and I've expressed my view in https://github.com/apache/spark/pull/21889#issuecomment-413655304. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 Essentially, this PR was created to take the management of #21320 out of my hands, with a view towards facilitating its incorporation into Spark 2.4. It was my suggestion, one based in frustration. In hindsight, I no longer believe this strategy is the bestâor most expedientâapproach towards progress. Indeed, I believe the direction of this PR has become orthogonal to its motivating goal, becoming a dispute between myself and @HyukjinKwon rather than a means to move things along. I believe I can shepherd #21320 in a way that will promote greater progress. @ajacques, I mean no disrespect, and I thank you for volunteering your time, patience and effort for the sake of all that are interested in seeing this patch become a part of Spark. And I apologize for letting you down, letting everyone down. In my conduct leading up to the creation of this PR I did not act with the greatest maturity or patience. And I did not act in the best interests of the community. No one has spent more time or more effort, taken more responsibility or exhibited more patience with this 2+ year patch-set-in-the-making than myself. I respectfully submit it is mine to present and manage, and no one else's. Insofar as I have expressed otherwise in the past, I admit my errorâone made in frustrationâand recant in hindsight. @ajacques, at this point I respectfully assert that managing the patch set I submitted in #21320 is not your responsibility, nor is it anyone else's but mine. I ask you to close this PR so that we can resume the review in #21320. As I stated there, you are welcome to open a PR on https://github.com/VideoAmp/spark-public/tree/spark-4502-parquet_column_pruning-foundation to submit the changes you've made for review. Thank you. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 >> Hello, we've been using your patch at Stripe and we've found something that looks like a new bug: > > Thank you for sharing this, @xinxin-stripe. This is very helpful. I will investigate and report back. I have not been able to reproduce this issue with this branch at commit 0e5594b6ac1dcb94f3f0166e66a7d4e7eae3d00c. However, I'm seeing the same failure scenario as yours on VideoAmp's internal 2.1, 2.2 and 2.3 backports of this branch. I think the reason for this difference is that our internal branches (and probably yours) incorporate rules to support pruning for aggregations. That functionality was removed from this PR. I will fix this and share the fix with you. It would help if you could send me a scenario where you can reproduce this failure with a Spark SQL query. Query plans for datasets built from SQL queries tend to be much more readable. Consider e-mailing me directly on this issue because it does not appear to be strictly related to this PR. My e-mail address is m...@allman.ms. Thanks again! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 > Hello, we've been using your patch at Stripe and we've found something that looks like a new bug: Thank you for sharing this, @xinxin-stripe. This is very helpful. I will investigate and report back. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 > Due to the urgency of the upcoming 2.4 code freeze, I'm going to open this PR to collect any feedback. This can be closed if you prefer to continue to the work in the original PR. That would be my preference, yes, especially if it means less work for you. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 > @mallman if you're planning on making more code changes, would you be willing to work on a shared branch or something? I've been working to incorporate the CR comments. No, however if you want to open a PR against the VideoAmp spark-4502-parquet_column_pruning-foundation branch I will review your changes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 >> the window of opportunity to review syntax and style in this PR closed long ago. > Why/when is this window closed? Who closed that? What I wrote above is a coarse approximation of my stance on the matter. It's inadequate, and I have struggled to adequately express myself. Reflecting on this last night I believe I was able to nail down exactly what I want to write, but I don't have time to write right now. I will reply in full later, within a day or two. I will also address your recent comments and questions. Thank you. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 @ajacques I added a commit to enable schema pruning by default. It's a little more complete than your commit to do the same. Please rebase off my branch and remove your commit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 > Then should we keep this one or #21889? shall we deduplicate the efforts? I requested to open that because this looks going to be inactive per your comments. As I stated before, I'll continue pushing changes to this branch. However, the window of opportunity to review syntax and style in this PR closed long ago. If someone wants to put forward that kind of comment for review I will consider it at my discretion. I'm not going to guarantee action or even a response. If someone relays a bug or a concern regarding correctness or performance, I will address it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 >> @mallman, while we wait for the go-no-go, do you have the changes for the next PR ready? Is there anything you need help with? > I have the hack I used originally, but I haven't tried finding a better solution yet. It could take some time to understand the underlying problem/incompatibility/misunderstanding/etc. I spent some time yesterday digging deeper into why the hack I wrote worked, and I think I understand now. Practically speaking, my follow-on PR will be about the same as the commit I removed. However, I can support it with some explanatory comments instead of just "this throws an exception sometimes". --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 > @mallman, can we close this PR? Are you willing to update here or not? I pushed an update less than a day ago, and I intend to continue pushing updates as needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 > @mallman, while we wait for the go-no-go, do you have the changes for the next PR ready? Is there anything you need help with? I have the hack I used originally, but I haven't tried finding a better solution yet. It could take some time to understand the underlying problem/incompatibility/misunderstanding/etc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 @ajacques Please rebase off my branch. @gatorsmile I don't recall seeing that error before. Any idea for how I can reproduce and debug? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 Are we waiting for @gatorsmile's go-ahead and merge? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 > just for clarification, so now .. there no outstanding bugs, some tests are ignored per #21320 (comment) and left comments were mostly addressed. Did i understand correctly? The ignored testsâand the scenarios they are intended to testâwill fail with a runtime exception if this feature is enabled. I put forward a fix in `ParquetReadSupport.scala`, but @gatorsmile didn't want to address that in this PR. Otherwise, there are no known bugs with this patch. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 See https://github.com/apache/spark/pull/21320#issuecomment-406353694 for @gatorsmile's request to move the changes to `ParquetReadSupport.scala` to another PR. There was another, unrelated bug reported by @jainaks and addressed in https://github.com/apache/spark/pull/21320#issuecomment-408588685. AFAIK, there's nothing outstanding blocking this PR from being merged as I stated in https://github.com/apache/spark/pull/21889#issuecomment-410557228. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 > Assuming from #21889 (comment), we shouldn't have any identified bug here. What kind of bugs left to be fixed? That bug was address by b50ddb4. We still need to fix the bug underlying the failing (ignored) test case. I have a tentative fix for that, but @gatorsmile wants to review it in a follow-on PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21889: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21889#discussion_r208446828 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.io.File + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.execution.FileSchemaPruningTest +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext + +class ParquetSchemaPruningSuite +extends QueryTest +with ParquetTest +with FileSchemaPruningTest +with SharedSQLContext { + case class FullName(first: String, middle: String, last: String) + case class Contact( +id: Int, +name: FullName, +address: String, +pets: Int, +friends: Array[FullName] = Array(), +relatives: Map[String, FullName] = Map()) + + val janeDoe = FullName("Jane", "X.", "Doe") + val johnDoe = FullName("John", "Y.", "Doe") + val susanSmith = FullName("Susan", "Z.", "Smith") + + val contacts = +Contact(0, janeDoe, "123 Main Street", 1, friends = Array(susanSmith), + relatives = Map("brother" -> johnDoe)) :: +Contact(1, johnDoe, "321 Wall Street", 3, relatives = Map("sister" -> janeDoe)) :: Nil + + case class Name(first: String, last: String) + case class BriefContact(id: Int, name: Name, address: String) + + val briefContacts = +BriefContact(2, Name("Janet", "Jones"), "567 Maple Drive") :: +BriefContact(3, Name("Jim", "Jones"), "6242 Ash Street") :: Nil + + case class ContactWithDataPartitionColumn( +id: Int, +name: FullName, +address: String, +pets: Int, +friends: Array[FullName] = Array(), +relatives: Map[String, FullName] = Map(), +p: Int) + + case class BriefContactWithDataPartitionColumn(id: Int, name: Name, address: String, p: Int) + + val contactsWithDataPartitionColumn = +contacts.map { case Contact(id, name, address, pets, friends, relatives) => + ContactWithDataPartitionColumn(id, name, address, pets, friends, relatives, 1) } + val briefContactsWithDataPartitionColumn = +briefContacts.map { case BriefContact(id, name, address) => + BriefContactWithDataPartitionColumn(id, name, address, 2) } + + testSchemaPruning("select a single complex field") { +val query = sql("select name.middle from contacts order by id") +checkScanSchemata(query, "struct>") +checkAnswer(query, Row("X.") :: Row("Y.") :: Row(null) :: Row(null) :: Nil) + } + + testSchemaPruning("select a single complex field and its parent struct") { +val query = sql("select name.middle, name from contacts order by id") +checkScanSchemata(query, "struct>") +checkAnswer(query, + Row("X.", Row("Jane", "X.", "Doe")) :: + Row("Y.", Row("John", "Y.", "Doe")) :: + Row(null, Row("Janet", null, "Jones")) :: + Row(null, Row("Jim", null, "Jones")) :: + Nil) + } + + testSchemaPruning("select a single complex field array and its parent struct array") { +val query = sql("select friends.middle, friends from contacts where p=1 order by id") +checkScanSchemata(query, + "struct>>") +checkAnswer(query, + Row(Array("Z."), Array(Row(&
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 > Alright to make sure we're all on the same page, it sounds like we're ready to merge this PR pending: > > * Successful build by Jenkins > * Any PR comments from a maintainer > > This feature will be merged in disabled state and can't be enabled until the next PR is merged, but we do not expect any regression in behavior in the default disabled state. I agree. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 > @mallman Is it related to this revert in ParquetReadSupport.scala? I re-added this logic and all 32 tests in ParquetSchemaPruningSuite passed. Yes. That's what we need to work on in the next PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 I've pushed a commit to restore the original test coverage while also ensuring determinism of the output. Don't ask me how I did it. It's a secret! The test that was failing before it was kinda passing is now failing again so I marked it ignored so it wouldn't break Jenkins. And I reverted the commit that enabled this feature by default, because it's still broken. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 > select id, name.middle, address from temp - Works > select name.middle, address from temp - Fails > select name.middle from temp - Works > select name.middle, id, address from temp - Works > select name.middle, address, id from temp - Works Removing the `order by` clause from your test query caused it to fail, but it has nothing to do with ordering. It appears that the failure in this case is manifested when the file scan schema is exactly the `name.middle` and `address` columns. Introducing the `order by` clauses in the test suite queries gave them necessary determinism for checking query answers, but these modifications also altered the file scan schema. I need to fix the tests, but I think that the failure underlying the previously ignored test case has not been resolved after all. It was just a case of confusing coincidence. Unfortunately we're still not ready to merge this PR yet. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 > Test build #94228 has finished for PR 21889 at commit 92901da. The test failure appears to be unrelated to this PR. Is it just me or has the test suite become flakier in the past few months? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 > The tests as committed pass for me, but I removed the order by id and I got that error. Are you saying it works with the specific query in my comment? @ajacques Please try this query: ``` select id, name.middle, address from temp ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 > The tests as committed pass for me, but I removed the order by id and I got that error. Are you saying it works with the specific query in my comment? Oh! I didn't notice you changed the query. Okay. I'll take a closer look. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 > @mallman: I've rebased on top of your changes and pushed. I'm seeing the following: That test passes for me locally. Also, I inspected your branch and could not find any errors in the rebase. What commit hash are you testing locally? I'm using `92901da3785ce94db501a4c3d9be6316cfbf29a9`. Please ensure we're on the same commit. If so, try doing an `sbt clean` and running your test again. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 > @mallman: I've rebased on top of your changes and pushed. I'm seeing the following That's the test case that I "unignored". It was passing. There must be some simple explanation. I will look into it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org