[GitHub] spark pull request #21295: [SPARK-24230][SQL] Fix SpecificParquetRecordReade...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21295 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21295: [SPARK-24230][SQL] Fix SpecificParquetRecordReade...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21295#discussion_r190379077 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java --- @@ -147,7 +147,8 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont this.sparkSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString); this.reader = new ParquetFileReader( configuration, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns()); -for (BlockMetaData block : blocks) { +// use the blocks from the reader in case some do not match filters and will not be read --- End diff -- Yes, we will need to backport this to the 2.3.x line. No rush to make it for 2.3.1 though, since dictionary filtering is off by default and this isn't a correctness problem. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21295: [SPARK-24230][SQL] Fix SpecificParquetRecordReade...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21295#discussion_r190378887 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java --- @@ -225,7 +226,8 @@ protected void initialize(String path, List columns) throws IOException this.sparkSchema = new ParquetToSparkSchemaConverter(config).convert(requestedSchema); this.reader = new ParquetFileReader( config, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns()); -for (BlockMetaData block : blocks) { +// use the blocks from the reader in case some do not match filters and will not be read +for (BlockMetaData block : reader.getRowGroups()) { --- End diff -- Dictionary filtering is off by default in 1.8.x. It was enabled after we built confidence in its correctness in 1.9.x. We should backport this fix to 2.3.x also, but the only downside to not having it is that dictionary filtering will throw an exception when it is enabled. So the feature just isn't available. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21295: [SPARK-24230][SQL] Fix SpecificParquetRecordReade...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21295#discussion_r190377736 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala --- @@ -879,6 +879,18 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } } } + + test("SPARK-24230: filter row group using dictionary") { +withSQLConf(("parquet.filter.dictionary.enabled", "true")) { --- End diff -- Looks like the problem is a bug in Parquet. It is using the [stats property instead of the dictionary property](https://github.com/apache/parquet-mr/blob/8bbc6cb95fd9b4b9e86c924ca1e40fd555ecac1d/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java#L83). This is minor because there is almost no reason to turn either one off now that we've built more confidence in the filters. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21295: [SPARK-24230][SQL] Fix SpecificParquetRecordReade...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21295#discussion_r190252764 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java --- @@ -225,7 +226,8 @@ protected void initialize(String path, List columns) throws IOException this.sparkSchema = new ParquetToSparkSchemaConverter(config).convert(requestedSchema); this.reader = new ParquetFileReader( config, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns()); -for (BlockMetaData block : blocks) { +// use the blocks from the reader in case some do not match filters and will not be read +for (BlockMetaData block : reader.getRowGroups()) { --- End diff -- I think this is an existing issue, does your test case fail on Spark 2.3 too? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21295: [SPARK-24230][SQL] Fix SpecificParquetRecordReade...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21295#discussion_r190251186 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala --- @@ -879,6 +879,18 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } } } + + test("SPARK-24230: filter row group using dictionary") { +withSQLConf(("parquet.filter.dictionary.enabled", "true")) { --- End diff -- using SQLConf is OK, Spark will put all SQL configs to Hadoop conf, which will be accessed by parquet writer at the executor side. I'm also curious about why turning `parquet.filter.dictionary.enabled` off can't avoid this bug. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21295: [SPARK-24230][SQL] Fix SpecificParquetRecordReade...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21295#discussion_r190076831 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala --- @@ -879,6 +879,18 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } } } + + test("SPARK-24230: filter row group using dictionary") { +withSQLConf(("parquet.filter.dictionary.enabled", "true")) { --- End diff -- We are unable to pass the parquet-specific parameter through `withSQLConf`. Below shows the way to pass the parquet option. ```Scala withTable("t1") { spark.createDataFrame((0 until 100).map(i => ((i * 2) % 20, s"data-$i"))).write .option("parquet.filter.dictionary.enabled", false).saveAsTable("t1") checkAnswer(sql("SELECT _2 FROM t1 WHERE t1._1 = 5"), Seq.empty) } ``` Could you help investigate why we still hit the error [without the fix] when we set `parquet.filter.dictionary.enabled` to `false`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21295: [SPARK-24230][SQL] Fix SpecificParquetRecordReade...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/21295#discussion_r189960120 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java --- @@ -147,7 +147,8 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont this.sparkSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString); this.reader = new ParquetFileReader( configuration, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns()); -for (BlockMetaData block : blocks) { +// use the blocks from the reader in case some do not match filters and will not be read --- End diff -- It looks correct to me, too. However, this comment isn't clear. - If the comment is correct only in Parquet 1.10.0, please fix the comment. - If the comment is correct, the failure should occur in Apache Spark 2.3.X (with old Parquet). Why don't we fix that in 2.3.1? This is [my original suggestion](https://github.com/apache/spark/pull/21295#issuecomment-388656852). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21295: [SPARK-24230][SQL] Fix SpecificParquetRecordReade...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/21295#discussion_r189957937 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala --- @@ -879,6 +879,18 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } } } + + test("SPARK-24230: filter row group using dictionary") { +withSQLConf(("parquet.filter.dictionary.enabled", "true")) { --- End diff -- That was my point, too. This configuration is needed, but this code doesn't do anything for that. To use this configuration correctly, we need to fix it first. We should not have no-op code like this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21295: [SPARK-24230][SQL] Fix SpecificParquetRecordReade...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21295#discussion_r189748585 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala --- @@ -879,6 +879,18 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } } } + + test("SPARK-24230: filter row group using dictionary") { +withSQLConf(("parquet.filter.dictionary.enabled", "true")) { --- End diff -- I don't think this is misleading. The dictionary filter needs to be on and there's no guarantee from Parquet that the default will continue to be true. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21295: [SPARK-24230][SQL] Fix SpecificParquetRecordReade...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21295#discussion_r189748452 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java --- @@ -147,7 +147,8 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont this.sparkSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString); this.reader = new ParquetFileReader( configuration, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns()); -for (BlockMetaData block : blocks) { +// use the blocks from the reader in case some do not match filters and will not be read --- End diff -- Actually, it is fine and more correct for this to be ported to older versions. I doubt it will because it is unnecessary though. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21295: [SPARK-24230][SQL] Fix SpecificParquetRecordReade...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21295#discussion_r189748419 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala --- @@ -879,6 +879,18 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } } } + + test("SPARK-24230: filter row group using dictionary") { +withSQLConf(("parquet.filter.dictionary.enabled", "true")) { --- End diff -- Actually, it is fine and more correct for this to be ported to older versions. I doubt it will because it is unnecessary though. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21295: [SPARK-24230][SQL] Fix SpecificParquetRecordReade...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/21295#discussion_r187813544 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java --- @@ -147,7 +147,8 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont this.sparkSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString); this.reader = new ParquetFileReader( configuration, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns()); -for (BlockMetaData block : blocks) { +// use the blocks from the reader in case some do not match filters and will not be read --- End diff -- What I mean is `this patch is logically okay, but only valid for `master` branch, Spark 2.4 with Parquet 1.10.0`. For example, the test case will pass in `branch-2.3` without this patch because it uses Parquet 1.8.X. As you mentioned, it would be great if we had included this patch in #21070. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21295: [SPARK-24230][SQL] Fix SpecificParquetRecordReade...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/21295#discussion_r187813149 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala --- @@ -879,6 +879,18 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } } } + + test("SPARK-24230: filter row group using dictionary") { +withSQLConf(("parquet.filter.dictionary.enabled", "true")) { --- End diff -- If there is someone who leads this correctly, it's you, @rdblue . :) I knew that this is the default of parquet. With the patch of `SpecificParquetRecordReaderBase.java` or not, we should not add `no-op` invalid conf line like `withSQLConf` here. It's misleading for the whole Spark community for the future. Please debug and add the correct test case here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21295: [SPARK-24230][SQL] Fix SpecificParquetRecordReade...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21295#discussion_r187809226 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala --- @@ -879,6 +879,18 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } } } + + test("SPARK-24230: filter row group using dictionary") { +withSQLConf(("parquet.filter.dictionary.enabled", "true")) { --- End diff -- This is the default, so it is possible that it isn't getting passed to Parquet correctly. I can debug it at some point to find out why it passes with `false`. I did make sure that the test case fails without the fix, so we know it should be correctly using dictionary filtering. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21295: [SPARK-24230][SQL] Fix SpecificParquetRecordReade...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21295#discussion_r187809171 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java --- @@ -147,7 +147,8 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont this.sparkSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString); this.reader = new ParquetFileReader( configuration, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns()); -for (BlockMetaData block : blocks) { +// use the blocks from the reader in case some do not match filters and will not be read --- End diff -- I'm not sure what you mean here. This is a problem entirely in Spark because Spark is reaching into Parquet internals for its vectorized support. There's no Parquet issue to reference. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21295: [SPARK-24230][SQL] Fix SpecificParquetRecordReade...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/21295#discussion_r187778727 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java --- @@ -147,7 +147,8 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont this.sparkSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString); this.reader = new ParquetFileReader( configuration, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns()); -for (BlockMetaData block : blocks) { +// use the blocks from the reader in case some do not match filters and will not be read --- End diff -- Could you be more specific by mentioning the corresponding Parquet JIRA issue or versions (1.10.0)? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21295: [SPARK-24230][SQL] Fix SpecificParquetRecordReade...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/21295#discussion_r187778648 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala --- @@ -879,6 +879,18 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } } } + + test("SPARK-24230: filter row group using dictionary") { +withSQLConf(("parquet.filter.dictionary.enabled", "true")) { --- End diff -- Is this a valid way to control this configuration? It seems to pass with `false`, too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21295: [SPARK-24230][SQL] Fix SpecificParquetRecordReade...
GitHub user rdblue opened a pull request: https://github.com/apache/spark/pull/21295 [SPARK-24230][SQL] Fix SpecificParquetRecordReaderBase with dictionary filters. ## What changes were proposed in this pull request? I missed this commit when preparing #21070. When Parquet is able to filter blocks with dictionary filtering, the expected total value count to be too high in Spark, leading to an error when there were fewer than expected row groups to process. Spark should get the row groups from Parquet to pick up new filter schemes in Parquet like dictionary filtering. ## How was this patch tested? By hand. Need to add a test case. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rdblue/spark SPARK-24230-fix-parquet-block-tracking Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21295.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21295 commit 0fa5abeffc6184979d5909e29eb43d33991ed832 Author: Ryan BlueDate: 2018-01-31T00:48:01Z SPARK-24230: Fix SpecificParquetRecordReaderBase with dictionary filters. Filtered blocks were causing the expected total value count to be too high, which led to an error when there were fewer than expected row groups to process. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org