[jira] [Commented] (SPARK-35287) RemoveRedundantProjects removes non-redundant projects
[ https://issues.apache.org/jira/browse/SPARK-35287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17348184#comment-17348184 ] Apache Spark commented on SPARK-35287: -- User 'sarutak' has created a pull request for this issue: https://github.com/apache/spark/pull/32606 > RemoveRedundantProjects removes non-redundant projects > -- > > Key: SPARK-35287 > URL: https://issues.apache.org/jira/browse/SPARK-35287 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.1 >Reporter: Chungmin >Priority: Major > > RemoveRedundantProjects erroneously removes non-redundant projects which are > required to convert rows coming from DataSourceV2ScanExec to UnsafeRow. There > is a code for this case, but it only looks at the child. The bug occurs when > DataSourceV2ScanExec is not a child of the project, but a descendant. The > method {{isRedundant}} in {{RemoveRedundantProjects}} should be updated to > account for descendants too. > The original scenario requires Iceberg to reproduce the issue. In theory, it > should be able to reproduce the bug with Spark SQL only, and someone more > knowledgeable with Spark SQL should be able to make such a scenario. The > following is my reproduction scenario (Spark 3.1.1, Iceberg 0.11.1): > {code:java} > import scala.collection.JavaConverters._ > import org.apache.iceberg.{PartitionSpec, TableProperties} > import org.apache.iceberg.hadoop.HadoopTables > import org.apache.iceberg.spark.SparkSchemaUtil > import org.apache.spark.sql.{DataFrame, QueryTest, SparkSession} > import org.apache.spark.sql.internal.SQLConf > class RemoveRedundantProjectsTest extends QueryTest { > override val spark: SparkSession = SparkSession > .builder() > .master("local[4]") > .config("spark.driver.bindAddress", "127.0.0.1") > .appName(suiteName) > .getOrCreate() > test("RemoveRedundantProjects removes non-redundant projects") { > withSQLConf( > SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", > SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false", > SQLConf.REMOVE_REDUNDANT_PROJECTS_ENABLED.key -> "true") { > withTempDir { dir => > val path = dir.getCanonicalPath > val data = spark.range(3).toDF > val table = new HadoopTables().create( > SparkSchemaUtil.convert(data.schema), > PartitionSpec.unpartitioned(), > Map(TableProperties.WRITE_NEW_DATA_LOCATION -> path).asJava, > path) > data.write.format("iceberg").mode("overwrite").save(path) > table.refresh() > val df = spark.read.format("iceberg").load(path) > val dfX = df.as("x") > val dfY = df.as("y") > val join = dfX.filter(dfX("id") > 0).join(dfY, "id") > join.explain("extended") > assert(join.count() == 2) > } > } > } > } > {code} > Stack trace: > {noformat} > [info] - RemoveRedundantProjects removes non-redundant projects *** FAILED *** > [info] org.apache.spark.SparkException: Job aborted due to stage failure: > Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in > stage 1.0 (TID 4) (xeroxms100.northamerica.corp.microsoft.com executor > driver): java.lang.ClassCastException: > org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast > to org.apache.spark.sql.catalyst.expressions.UnsafeRow > [info] at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:226) > [info] at > org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119) > [info] at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) > [info] at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) > [info] at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > [info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) > [info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) > [info] at > org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) > [info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) > [info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) > [info] at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > [info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) > [info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) > [info] at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > [info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) > [info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) > [info] at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > [info] at
[jira] [Commented] (SPARK-35287) RemoveRedundantProjects removes non-redundant projects
[ https://issues.apache.org/jira/browse/SPARK-35287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17338291#comment-17338291 ] Hyukjin Kwon commented on SPARK-35287: -- cc [~aokolnychyi] FYI > RemoveRedundantProjects removes non-redundant projects > -- > > Key: SPARK-35287 > URL: https://issues.apache.org/jira/browse/SPARK-35287 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.1 >Reporter: Chungmin >Priority: Major > > RemoveRedundantProjects erroneously removes non-redundant projects which are > required to convert rows coming from DataSourceV2ScanExec to UnsafeRow. There > is a code for this case, but it only looks at the child. The bug occurs when > DataSourceV2ScanExec is not a child of the project, but a descendant. The > method {{isRedundant}} in {{RemoveRedundantProjects}} should be updated to > account for descendants too. > The original scenario requires Iceberg to reproduce the issue. In theory, it > should be able to reproduce the bug with Spark SQL only, and someone more > knowledgeable with Spark SQL should be able to make such a scenario. The > following is my reproduction scenario (Spark 3.1.1, Iceberg 0.11.1): > {code:java} > import scala.collection.JavaConverters._ > import org.apache.iceberg.{PartitionSpec, TableProperties} > import org.apache.iceberg.hadoop.HadoopTables > import org.apache.iceberg.spark.SparkSchemaUtil > import org.apache.spark.sql.{DataFrame, QueryTest, SparkSession} > import org.apache.spark.sql.internal.SQLConf > class RemoveRedundantProjectsTest extends QueryTest { > override val spark: SparkSession = SparkSession > .builder() > .master("local[4]") > .config("spark.driver.bindAddress", "127.0.0.1") > .appName(suiteName) > .getOrCreate() > test("RemoveRedundantProjects removes non-redundant projects") { > withSQLConf( > SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", > SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false", > SQLConf.REMOVE_REDUNDANT_PROJECTS_ENABLED.key -> "true") { > withTempDir { dir => > val path = dir.getCanonicalPath > val data = spark.range(3).toDF > val table = new HadoopTables().create( > SparkSchemaUtil.convert(data.schema), > PartitionSpec.unpartitioned(), > Map(TableProperties.WRITE_NEW_DATA_LOCATION -> path).asJava, > path) > data.write.format("iceberg").mode("overwrite").save(path) > table.refresh() > val df = spark.read.format("iceberg").load(path) > val dfX = df.as("x") > val dfY = df.as("y") > val join = dfX.filter(dfX("id") > 0).join(dfY, "id") > join.explain("extended") > assert(join.count() == 2) > } > } > } > } > {code} > Stack trace: > {noformat} > [info] - RemoveRedundantProjects removes non-redundant projects *** FAILED *** > [info] org.apache.spark.SparkException: Job aborted due to stage failure: > Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in > stage 1.0 (TID 4) (xeroxms100.northamerica.corp.microsoft.com executor > driver): java.lang.ClassCastException: > org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast > to org.apache.spark.sql.catalyst.expressions.UnsafeRow > [info] at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:226) > [info] at > org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119) > [info] at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) > [info] at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) > [info] at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > [info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) > [info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) > [info] at > org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) > [info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) > [info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) > [info] at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > [info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) > [info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) > [info] at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > [info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) > [info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) > [info] at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > [info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) > [info] at