This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 1f4c8e42479 [SPARK-40775][SQL] Fix duplicate description entries for V2 file scans 1f4c8e42479 is described below commit 1f4c8e42479a9c9df37864507dfd39d9819fe83c Author: Adam Binford <adam...@gmail.com> AuthorDate: Mon Dec 12 18:15:58 2022 +0800 [SPARK-40775][SQL] Fix duplicate description entries for V2 file scans ### What changes were proposed in this pull request? Remove overriding the description method in the V2 file sources. `FileScan` already uses all the metadata to create the description, so adding the same fields to the overridden description creates duplicates. ### Why are the changes needed? Example parquet scan from the agg pushdown suite: Before: ``` +- BatchScan parquet file:/...[min(_3)#814, max(_3)#815, min(_1)#816, max(_1)#817, count(*)#818L, count(_1)#819L, count(_2)#820L, count(_3)#821L] ParquetScan DataFilters: [], Format: parquet, Location: InMemoryFileIndex(1 paths)[file:/..., PartitionFilters: [], PushedAggregation: [MIN(_3), MAX(_3), MIN(_1), MAX(_1), COUNT(*), COUNT(_1), COUNT(_2), COUNT(_3)], PushedFilters: [], PushedGroupBy: [], ReadSchema: struct<min(_3):int,max(_3):int,min(_1):int,max(_1):int,count(*):bigint,count( [...] ``` After: ``` +- BatchScan parquet file:/...[min(_3)#814, max(_3)#815, min(_1)#816, max(_1)#817, count(*)#818L, count(_1)#819L, count(_2)#820L, count(_3)#821L] ParquetScan DataFilters: [], Format: parquet, Location: InMemoryFileIndex(1 paths)[file:/..., PartitionFilters: [], PushedAggregation: [MIN(_3), MAX(_3), MIN(_1), MAX(_1), COUNT(*), COUNT(_1), COUNT(_2), COUNT(_3)], PushedFilters: [], PushedGroupBy: [], ReadSchema: struct<min(_3):int,max(_3):int,min(_1):int,max(_1):int,count(*):bigint,count [...] ``` ### Does this PR introduce _any_ user-facing change? Just description change in explain output. ### How was this patch tested? Updated a few UTs to accommodate checking explain string. Closes #38229 from Kimahriman/remove-file-source-description. Authored-by: Adam Binford <adam...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../scala/org/apache/spark/sql/v2/avro/AvroScan.scala | 4 ---- .../sql/execution/datasources/v2/csv/CSVScan.scala | 4 ---- .../sql/execution/datasources/v2/json/JsonScan.scala | 4 ++-- .../sql/execution/datasources/v2/orc/OrcScan.scala | 6 ------ .../datasources/v2/parquet/ParquetScan.scala | 6 ------ .../scala/org/apache/spark/sql/ExplainSuite.scala | 19 +++++-------------- .../FileSourceAggregatePushDownSuite.scala | 3 ++- 7 files changed, 9 insertions(+), 37 deletions(-) diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala b/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala index d0f38c12427..763b9abe4f9 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala @@ -70,10 +70,6 @@ case class AvroScan( override def hashCode(): Int = super.hashCode() - override def description(): String = { - super.description() + ", PushedFilters: " + pushedFilters.mkString("[", ", ", "]") - } - override def getMetaData(): Map[String, String] = { super.getMetaData() ++ Map("PushedFilters" -> seqToString(pushedFilters)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala index d81223b48a5..734f8165aff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala @@ -91,10 +91,6 @@ case class CSVScan( override def hashCode(): Int = super.hashCode() - override def description(): String = { - super.description() + ", PushedFilters: " + pushedFilters.mkString("[", ", ", "]") - } - override def getMetaData(): Map[String, String] = { super.getMetaData() ++ Map("PushedFilters" -> seqToString(pushedFilters)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala index 9ab367136fc..c9a3a6f5e7f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala @@ -91,7 +91,7 @@ case class JsonScan( override def hashCode(): Int = super.hashCode() - override def description(): String = { - super.description() + ", PushedFilters: " + pushedFilters.mkString("[", ", ", "]") + override def getMetaData(): Map[String, String] = { + super.getMetaData() ++ Map("PushedFilters" -> pushedFilters.mkString("[", ", ", "]")) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala index ccb9ca9c6b3..072ab26774e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala @@ -92,12 +92,6 @@ case class OrcScan( ("[]", "[]") } - override def description(): String = { - super.description() + ", PushedFilters: " + seqToString(pushedFilters) + - ", PushedAggregation: " + pushedAggregationsStr + - ", PushedGroupBy: " + pushedGroupByStr - } - override def getMetaData(): Map[String, String] = { super.getMetaData() ++ Map("PushedFilters" -> seqToString(pushedFilters)) ++ Map("PushedAggregation" -> pushedAggregationsStr) ++ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala index ff0b38880fd..619a8fe66e3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala @@ -127,12 +127,6 @@ case class ParquetScan( ("[]", "[]") } - override def description(): String = { - super.description() + ", PushedFilters: " + seqToString(pushedFilters) + - ", PushedAggregation: " + pushedAggregationsStr + - ", PushedGroupBy: " + pushedGroupByStr - } - override def getMetaData(): Map[String, String] = { super.getMetaData() ++ Map("PushedFilters" -> seqToString(pushedFilters)) ++ Map("PushedAggregation" -> pushedAggregationsStr) ++ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala index 50c3b8fbf48..b5353455dc2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala @@ -462,17 +462,8 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite withTempDir { dir => Seq("parquet", "orc", "csv", "json").foreach { fmt => val basePath = dir.getCanonicalPath + "/" + fmt - val pushFilterMaps = Map ( - "parquet" -> - "|PushedFilters: \\[IsNotNull\\(value\\), GreaterThan\\(value,2\\)\\]", - "orc" -> - "|PushedFilters: \\[IsNotNull\\(value\\), GreaterThan\\(value,2\\)\\]", - "csv" -> - "|PushedFilters: \\[IsNotNull\\(value\\), GreaterThan\\(value,2\\)\\]", - "json" -> - "|remove_marker" - ) - val expected_plan_fragment1 = + + val expectedPlanFragment = s""" |\\(1\\) BatchScan $fmt file:$basePath |Output \\[2\\]: \\[value#x, id#x\\] @@ -480,9 +471,9 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite |Format: $fmt |Location: InMemoryFileIndex\\([0-9]+ paths\\)\\[.*\\] |PartitionFilters: \\[isnotnull\\(id#x\\), \\(id#x > 1\\)\\] - ${pushFilterMaps.get(fmt).get} + |PushedFilters: \\[IsNotNull\\(value\\), GreaterThan\\(value,2\\)\\] |ReadSchema: struct\\<value:int\\> - |""".stripMargin.replaceAll("\nremove_marker", "").trim + |""".stripMargin.trim spark.range(10) .select(col("id"), col("id").as("value")) @@ -500,7 +491,7 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite .format(fmt) .load(basePath).where($"id" > 1 && $"value" > 2) val normalizedOutput = getNormalizedExplain(df, FormattedMode) - assert(expected_plan_fragment1.r.findAllMatchIn(normalizedOutput).length == 1) + assert(expectedPlanFragment.r.findAllMatchIn(normalizedOutput).length == 1) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceAggregatePushDownSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceAggregatePushDownSuite.scala index a68d9b951b7..e8fae210fa4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceAggregatePushDownSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceAggregatePushDownSuite.scala @@ -347,7 +347,8 @@ trait FileSourceAggregatePushDownSuite spark.read.format(format).load(file.getCanonicalPath).createOrReplaceTempView("test") Seq("false", "true").foreach { enableVectorizedReader => withSQLConf(aggPushDownEnabledKey -> "true", - vectorizedReaderEnabledKey -> enableVectorizedReader) { + vectorizedReaderEnabledKey -> enableVectorizedReader, + SQLConf.MAX_METADATA_STRING_LENGTH.key -> "1000") { val testMinWithAllTypes = sql("SELECT min(StringCol), min(BooleanCol), min(ByteCol), " + "min(BinaryCol), min(ShortCol), min(IntegerCol), min(LongCol), min(FloatCol), " + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org