[jira] [Commented] (SPARK-35287) RemoveRedundantProjects removes non-redundant projects

2021-05-20 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17348184#comment-17348184
 ] 

Apache Spark commented on SPARK-35287:
--

User 'sarutak' has created a pull request for this issue:
https://github.com/apache/spark/pull/32606

> RemoveRedundantProjects removes non-redundant projects
> --
>
> Key: SPARK-35287
> URL: https://issues.apache.org/jira/browse/SPARK-35287
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.1
>Reporter: Chungmin
>Priority: Major
>
> RemoveRedundantProjects erroneously removes non-redundant projects which are 
> required to convert rows coming from DataSourceV2ScanExec to UnsafeRow. There 
> is a code for this case, but it only looks at the child. The bug occurs when 
> DataSourceV2ScanExec is not a child of the project, but a descendant. The 
> method {{isRedundant}} in {{RemoveRedundantProjects}} should be updated to 
> account for descendants too.
> The original scenario requires Iceberg to reproduce the issue. In theory, it 
> should be able to reproduce the bug with Spark SQL only, and someone more 
> knowledgeable with Spark SQL should be able to make such a scenario. The 
> following is my reproduction scenario (Spark 3.1.1, Iceberg 0.11.1): 
> {code:java}
> import scala.collection.JavaConverters._
> import org.apache.iceberg.{PartitionSpec, TableProperties}
> import org.apache.iceberg.hadoop.HadoopTables
> import org.apache.iceberg.spark.SparkSchemaUtil
> import org.apache.spark.sql.{DataFrame, QueryTest, SparkSession}
> import org.apache.spark.sql.internal.SQLConf
> class RemoveRedundantProjectsTest extends QueryTest {
>   override val spark: SparkSession = SparkSession
> .builder()
> .master("local[4]")
> .config("spark.driver.bindAddress", "127.0.0.1")
> .appName(suiteName)
> .getOrCreate()
>   test("RemoveRedundantProjects removes non-redundant projects") {
> withSQLConf(
>   SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
>   SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
>   SQLConf.REMOVE_REDUNDANT_PROJECTS_ENABLED.key -> "true") {
>   withTempDir { dir =>
> val path = dir.getCanonicalPath
> val data = spark.range(3).toDF
> val table = new HadoopTables().create(
>   SparkSchemaUtil.convert(data.schema),
>   PartitionSpec.unpartitioned(),
>   Map(TableProperties.WRITE_NEW_DATA_LOCATION -> path).asJava,
>   path)
> data.write.format("iceberg").mode("overwrite").save(path)
> table.refresh()
> val df = spark.read.format("iceberg").load(path)
> val dfX = df.as("x")
> val dfY = df.as("y")
> val join = dfX.filter(dfX("id") > 0).join(dfY, "id")
> join.explain("extended")
> assert(join.count() == 2)
>   }
> }
>   }
> }
> {code}
> Stack trace:
> {noformat}
> [info] - RemoveRedundantProjects removes non-redundant projects *** FAILED ***
> [info]   org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in 
> stage 1.0 (TID 4) (xeroxms100.northamerica.corp.microsoft.com executor 
> driver): java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast 
> to org.apache.spark.sql.catalyst.expressions.UnsafeRow
> [info]  at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:226)
> [info]  at 
> org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
> [info]  at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
> [info]  at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
> [info]  at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> [info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> [info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> [info]  at 
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
> [info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> [info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> [info]  at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> [info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> [info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> [info]  at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> [info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> [info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> [info]  at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> [info]  at 

[jira] [Commented] (SPARK-35287) RemoveRedundantProjects removes non-redundant projects

2021-05-03 Thread Hyukjin Kwon (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17338291#comment-17338291
 ] 

Hyukjin Kwon commented on SPARK-35287:
--

cc [~aokolnychyi] FYI

> RemoveRedundantProjects removes non-redundant projects
> --
>
> Key: SPARK-35287
> URL: https://issues.apache.org/jira/browse/SPARK-35287
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.1
>Reporter: Chungmin
>Priority: Major
>
> RemoveRedundantProjects erroneously removes non-redundant projects which are 
> required to convert rows coming from DataSourceV2ScanExec to UnsafeRow. There 
> is a code for this case, but it only looks at the child. The bug occurs when 
> DataSourceV2ScanExec is not a child of the project, but a descendant. The 
> method {{isRedundant}} in {{RemoveRedundantProjects}} should be updated to 
> account for descendants too.
> The original scenario requires Iceberg to reproduce the issue. In theory, it 
> should be able to reproduce the bug with Spark SQL only, and someone more 
> knowledgeable with Spark SQL should be able to make such a scenario. The 
> following is my reproduction scenario (Spark 3.1.1, Iceberg 0.11.1): 
> {code:java}
> import scala.collection.JavaConverters._
> import org.apache.iceberg.{PartitionSpec, TableProperties}
> import org.apache.iceberg.hadoop.HadoopTables
> import org.apache.iceberg.spark.SparkSchemaUtil
> import org.apache.spark.sql.{DataFrame, QueryTest, SparkSession}
> import org.apache.spark.sql.internal.SQLConf
> class RemoveRedundantProjectsTest extends QueryTest {
>   override val spark: SparkSession = SparkSession
> .builder()
> .master("local[4]")
> .config("spark.driver.bindAddress", "127.0.0.1")
> .appName(suiteName)
> .getOrCreate()
>   test("RemoveRedundantProjects removes non-redundant projects") {
> withSQLConf(
>   SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
>   SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
>   SQLConf.REMOVE_REDUNDANT_PROJECTS_ENABLED.key -> "true") {
>   withTempDir { dir =>
> val path = dir.getCanonicalPath
> val data = spark.range(3).toDF
> val table = new HadoopTables().create(
>   SparkSchemaUtil.convert(data.schema),
>   PartitionSpec.unpartitioned(),
>   Map(TableProperties.WRITE_NEW_DATA_LOCATION -> path).asJava,
>   path)
> data.write.format("iceberg").mode("overwrite").save(path)
> table.refresh()
> val df = spark.read.format("iceberg").load(path)
> val dfX = df.as("x")
> val dfY = df.as("y")
> val join = dfX.filter(dfX("id") > 0).join(dfY, "id")
> join.explain("extended")
> assert(join.count() == 2)
>   }
> }
>   }
> }
> {code}
> Stack trace:
> {noformat}
> [info] - RemoveRedundantProjects removes non-redundant projects *** FAILED ***
> [info]   org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in 
> stage 1.0 (TID 4) (xeroxms100.northamerica.corp.microsoft.com executor 
> driver): java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast 
> to org.apache.spark.sql.catalyst.expressions.UnsafeRow
> [info]  at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:226)
> [info]  at 
> org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
> [info]  at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
> [info]  at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
> [info]  at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> [info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> [info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> [info]  at 
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
> [info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> [info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> [info]  at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> [info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> [info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> [info]  at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> [info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> [info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> [info]  at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> [info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> [info]  at