[jira] [Commented] (SPARK-45282) Join loses records for cached datasets
[ https://issues.apache.org/jira/browse/SPARK-45282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17808393#comment-17808393 ] Rob Russo commented on SPARK-45282: --- Is it possible that this also affects spark 3.3.2? I have an application that has been running on spark 3.3.2 and with AQE enabled. When I upgraded to 3.5.0 I immediately ran into the issue in this ticket. However when I started looking more closely I found that for 1 particular type of report the issue was still present even after rolling back to 3.3.2 with AQE enabled. Either way on 3.3.2 or 3.5.0, disabling AQE fixed the problem. > Join loses records for cached datasets > -- > > Key: SPARK-45282 > URL: https://issues.apache.org/jira/browse/SPARK-45282 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.1, 3.5.0 > Environment: spark 3.4.1 on apache hadoop 3.3.6 or kubernetes 1.26 or > databricks 13.3 >Reporter: koert kuipers >Assignee: Emil Ejbyfeldt >Priority: Blocker > Labels: CorrectnessBug, correctness, pull-request-available > Fix For: 3.4.2 > > > we observed this issue on spark 3.4.1 but it is also present on 3.5.0. it is > not present on spark 3.3.1. > it only shows up in distributed environment. i cannot replicate in unit test. > however i did get it to show up on hadoop cluster, kubernetes, and on > databricks 13.3 > the issue is that records are dropped when two cached dataframes are joined. > it seems in spark 3.4.1 in queryplan some Exchanges are dropped as an > optimization while in spark 3.3.1 these Exhanges are still present. it seems > to be an issue with AQE with canChangeCachedPlanOutputPartitioning=true. > to reproduce on distributed cluster these settings needed are: > {code:java} > spark.sql.adaptive.advisoryPartitionSizeInBytes 33554432 > spark.sql.adaptive.coalescePartitions.parallelismFirst false > spark.sql.adaptive.enabled true > spark.sql.optimizer.canChangeCachedPlanOutputPartitioning true {code} > code using scala to reproduce is: > {code:java} > import java.util.UUID > import org.apache.spark.sql.functions.col > import spark.implicits._ > val data = (1 to 100).toDS().map(i => > UUID.randomUUID().toString).persist() > val left = data.map(k => (k, 1)) > val right = data.map(k => (k, k)) // if i change this to k => (k, 1) it works! > println("number of left " + left.count()) > println("number of right " + right.count()) > println("number of (left join right) " + > left.toDF("key", "value1").join(right.toDF("key", "value2"), "key").count() > ) > val left1 = left > .toDF("key", "value1") > .repartition(col("key")) // comment out this line to make it work > .persist() > println("number of left1 " + left1.count()) > val right1 = right > .toDF("key", "value2") > .repartition(col("key")) // comment out this line to make it work > .persist() > println("number of right1 " + right1.count()) > println("number of (left1 join right1) " + left1.join(right1, > "key").count()) // this gives incorrect result{code} > this produces the following output: > {code:java} > number of left 100 > number of right 100 > number of (left join right) 100 > number of left1 100 > number of right1 100 > number of (left1 join right1) 859531 {code} > note that the last number (the incorrect one) actually varies depending on > settings and cluster size etc. > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-45282) Join loses records for cached datasets
[ https://issues.apache.org/jira/browse/SPARK-45282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17784320#comment-17784320 ] Emil Ejbyfeldt commented on SPARK-45282: Created this [https://github.com/apache/spark/pull/43729] to backport the fix to 3.4 from my manual test it solved the reproduction in this ticket. > Join loses records for cached datasets > -- > > Key: SPARK-45282 > URL: https://issues.apache.org/jira/browse/SPARK-45282 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.1, 3.5.0 > Environment: spark 3.4.1 on apache hadoop 3.3.6 or kubernetes 1.26 or > databricks 13.3 >Reporter: koert kuipers >Priority: Blocker > Labels: CorrectnessBug, correctness, pull-request-available > > we observed this issue on spark 3.4.1 but it is also present on 3.5.0. it is > not present on spark 3.3.1. > it only shows up in distributed environment. i cannot replicate in unit test. > however i did get it to show up on hadoop cluster, kubernetes, and on > databricks 13.3 > the issue is that records are dropped when two cached dataframes are joined. > it seems in spark 3.4.1 in queryplan some Exchanges are dropped as an > optimization while in spark 3.3.1 these Exhanges are still present. it seems > to be an issue with AQE with canChangeCachedPlanOutputPartitioning=true. > to reproduce on distributed cluster these settings needed are: > {code:java} > spark.sql.adaptive.advisoryPartitionSizeInBytes 33554432 > spark.sql.adaptive.coalescePartitions.parallelismFirst false > spark.sql.adaptive.enabled true > spark.sql.optimizer.canChangeCachedPlanOutputPartitioning true {code} > code using scala to reproduce is: > {code:java} > import java.util.UUID > import org.apache.spark.sql.functions.col > import spark.implicits._ > val data = (1 to 100).toDS().map(i => > UUID.randomUUID().toString).persist() > val left = data.map(k => (k, 1)) > val right = data.map(k => (k, k)) // if i change this to k => (k, 1) it works! > println("number of left " + left.count()) > println("number of right " + right.count()) > println("number of (left join right) " + > left.toDF("key", "value1").join(right.toDF("key", "value2"), "key").count() > ) > val left1 = left > .toDF("key", "value1") > .repartition(col("key")) // comment out this line to make it work > .persist() > println("number of left1 " + left1.count()) > val right1 = right > .toDF("key", "value2") > .repartition(col("key")) // comment out this line to make it work > .persist() > println("number of right1 " + right1.count()) > println("number of (left1 join right1) " + left1.join(right1, > "key").count()) // this gives incorrect result{code} > this produces the following output: > {code:java} > number of left 100 > number of right 100 > number of (left join right) 100 > number of left1 100 > number of right1 100 > number of (left1 join right1) 859531 {code} > note that the last number (the incorrect one) actually varies depending on > settings and cluster size etc. > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-45282) Join loses records for cached datasets
[ https://issues.apache.org/jira/browse/SPARK-45282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17784115#comment-17784115 ] koert kuipers commented on SPARK-45282: --- it does look like same issue and partitioning being the cause makes sense too > Join loses records for cached datasets > -- > > Key: SPARK-45282 > URL: https://issues.apache.org/jira/browse/SPARK-45282 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.1, 3.5.0 > Environment: spark 3.4.1 on apache hadoop 3.3.6 or kubernetes 1.26 or > databricks 13.3 >Reporter: koert kuipers >Priority: Blocker > Labels: CorrectnessBug, correctness > > we observed this issue on spark 3.4.1 but it is also present on 3.5.0. it is > not present on spark 3.3.1. > it only shows up in distributed environment. i cannot replicate in unit test. > however i did get it to show up on hadoop cluster, kubernetes, and on > databricks 13.3 > the issue is that records are dropped when two cached dataframes are joined. > it seems in spark 3.4.1 in queryplan some Exchanges are dropped as an > optimization while in spark 3.3.1 these Exhanges are still present. it seems > to be an issue with AQE with canChangeCachedPlanOutputPartitioning=true. > to reproduce on distributed cluster these settings needed are: > {code:java} > spark.sql.adaptive.advisoryPartitionSizeInBytes 33554432 > spark.sql.adaptive.coalescePartitions.parallelismFirst false > spark.sql.adaptive.enabled true > spark.sql.optimizer.canChangeCachedPlanOutputPartitioning true {code} > code using scala to reproduce is: > {code:java} > import java.util.UUID > import org.apache.spark.sql.functions.col > import spark.implicits._ > val data = (1 to 100).toDS().map(i => > UUID.randomUUID().toString).persist() > val left = data.map(k => (k, 1)) > val right = data.map(k => (k, k)) // if i change this to k => (k, 1) it works! > println("number of left " + left.count()) > println("number of right " + right.count()) > println("number of (left join right) " + > left.toDF("key", "value1").join(right.toDF("key", "value2"), "key").count() > ) > val left1 = left > .toDF("key", "value1") > .repartition(col("key")) // comment out this line to make it work > .persist() > println("number of left1 " + left1.count()) > val right1 = right > .toDF("key", "value2") > .repartition(col("key")) // comment out this line to make it work > .persist() > println("number of right1 " + right1.count()) > println("number of (left1 join right1) " + left1.join(right1, > "key").count()) // this gives incorrect result{code} > this produces the following output: > {code:java} > number of left 100 > number of right 100 > number of (left join right) 100 > number of left1 100 > number of right1 100 > number of (left1 join right1) 859531 {code} > note that the last number (the incorrect one) actually varies depending on > settings and cluster size etc. > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-45282) Join loses records for cached datasets
[ https://issues.apache.org/jira/browse/SPARK-45282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17784109#comment-17784109 ] Emil Ejbyfeldt commented on SPARK-45282: The code reproducing the bug looks quite similar to https://issues.apache.org/jira/browse/SPARK-45592 I wonder if the fix for that might also have solved this bug as I could not reproduce this issue on a build from the master branch. > Join loses records for cached datasets > -- > > Key: SPARK-45282 > URL: https://issues.apache.org/jira/browse/SPARK-45282 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.1, 3.5.0 > Environment: spark 3.4.1 on apache hadoop 3.3.6 or kubernetes 1.26 or > databricks 13.3 >Reporter: koert kuipers >Priority: Blocker > Labels: CorrectnessBug, correctness > > we observed this issue on spark 3.4.1 but it is also present on 3.5.0. it is > not present on spark 3.3.1. > it only shows up in distributed environment. i cannot replicate in unit test. > however i did get it to show up on hadoop cluster, kubernetes, and on > databricks 13.3 > the issue is that records are dropped when two cached dataframes are joined. > it seems in spark 3.4.1 in queryplan some Exchanges are dropped as an > optimization while in spark 3.3.1 these Exhanges are still present. it seems > to be an issue with AQE with canChangeCachedPlanOutputPartitioning=true. > to reproduce on distributed cluster these settings needed are: > {code:java} > spark.sql.adaptive.advisoryPartitionSizeInBytes 33554432 > spark.sql.adaptive.coalescePartitions.parallelismFirst false > spark.sql.adaptive.enabled true > spark.sql.optimizer.canChangeCachedPlanOutputPartitioning true {code} > code using scala to reproduce is: > {code:java} > import java.util.UUID > import org.apache.spark.sql.functions.col > import spark.implicits._ > val data = (1 to 100).toDS().map(i => > UUID.randomUUID().toString).persist() > val left = data.map(k => (k, 1)) > val right = data.map(k => (k, k)) // if i change this to k => (k, 1) it works! > println("number of left " + left.count()) > println("number of right " + right.count()) > println("number of (left join right) " + > left.toDF("key", "value1").join(right.toDF("key", "value2"), "key").count() > ) > val left1 = left > .toDF("key", "value1") > .repartition(col("key")) // comment out this line to make it work > .persist() > println("number of left1 " + left1.count()) > val right1 = right > .toDF("key", "value2") > .repartition(col("key")) // comment out this line to make it work > .persist() > println("number of right1 " + right1.count()) > println("number of (left1 join right1) " + left1.join(right1, > "key").count()) // this gives incorrect result{code} > this produces the following output: > {code:java} > number of left 100 > number of right 100 > number of (left join right) 100 > number of left1 100 > number of right1 100 > number of (left1 join right1) 859531 {code} > note that the last number (the incorrect one) actually varies depending on > settings and cluster size etc. > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-45282) Join loses records for cached datasets
[ https://issues.apache.org/jira/browse/SPARK-45282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781464#comment-17781464 ] Dongjoon Hyun commented on SPARK-45282: --- Thank you for sharing, [~koert]. > Join loses records for cached datasets > -- > > Key: SPARK-45282 > URL: https://issues.apache.org/jira/browse/SPARK-45282 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.1, 3.5.0 > Environment: spark 3.4.1 on apache hadoop 3.3.6 or kubernetes 1.26 or > databricks 13.3 >Reporter: koert kuipers >Priority: Blocker > Labels: CorrectnessBug, correctness > > we observed this issue on spark 3.4.1 but it is also present on 3.5.0. it is > not present on spark 3.3.1. > it only shows up in distributed environment. i cannot replicate in unit test. > however i did get it to show up on hadoop cluster, kubernetes, and on > databricks 13.3 > the issue is that records are dropped when two cached dataframes are joined. > it seems in spark 3.4.1 in queryplan some Exchanges are dropped as an > optimization while in spark 3.3.1 these Exhanges are still present. it seems > to be an issue with AQE with canChangeCachedPlanOutputPartitioning=true. > to reproduce on distributed cluster these settings needed are: > {code:java} > spark.sql.adaptive.advisoryPartitionSizeInBytes 33554432 > spark.sql.adaptive.coalescePartitions.parallelismFirst false > spark.sql.adaptive.enabled true > spark.sql.optimizer.canChangeCachedPlanOutputPartitioning true {code} > code using scala to reproduce is: > {code:java} > import java.util.UUID > import org.apache.spark.sql.functions.col > import spark.implicits._ > val data = (1 to 100).toDS().map(i => > UUID.randomUUID().toString).persist() > val left = data.map(k => (k, 1)) > val right = data.map(k => (k, k)) // if i change this to k => (k, 1) it works! > println("number of left " + left.count()) > println("number of right " + right.count()) > println("number of (left join right) " + > left.toDF("key", "value1").join(right.toDF("key", "value2"), "key").count() > ) > val left1 = left > .toDF("key", "value1") > .repartition(col("key")) // comment out this line to make it work > .persist() > println("number of left1 " + left1.count()) > val right1 = right > .toDF("key", "value2") > .repartition(col("key")) // comment out this line to make it work > .persist() > println("number of right1 " + right1.count()) > println("number of (left1 join right1) " + left1.join(right1, > "key").count()) // this gives incorrect result{code} > this produces the following output: > {code:java} > number of left 100 > number of right 100 > number of (left join right) 100 > number of left1 100 > number of right1 100 > number of (left1 join right1) 859531 {code} > note that the last number (the incorrect one) actually varies depending on > settings and cluster size etc. > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-45282) Join loses records for cached datasets
[ https://issues.apache.org/jira/browse/SPARK-45282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781171#comment-17781171 ] koert kuipers commented on SPARK-45282: --- last time i checked this issue was still present in 3.4/3.5/master > Join loses records for cached datasets > -- > > Key: SPARK-45282 > URL: https://issues.apache.org/jira/browse/SPARK-45282 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.1, 3.5.0 > Environment: spark 3.4.1 on apache hadoop 3.3.6 or kubernetes 1.26 or > databricks 13.3 >Reporter: koert kuipers >Priority: Blocker > Labels: CorrectnessBug, correctness > > we observed this issue on spark 3.4.1 but it is also present on 3.5.0. it is > not present on spark 3.3.1. > it only shows up in distributed environment. i cannot replicate in unit test. > however i did get it to show up on hadoop cluster, kubernetes, and on > databricks 13.3 > the issue is that records are dropped when two cached dataframes are joined. > it seems in spark 3.4.1 in queryplan some Exchanges are dropped as an > optimization while in spark 3.3.1 these Exhanges are still present. it seems > to be an issue with AQE with canChangeCachedPlanOutputPartitioning=true. > to reproduce on distributed cluster these settings needed are: > {code:java} > spark.sql.adaptive.advisoryPartitionSizeInBytes 33554432 > spark.sql.adaptive.coalescePartitions.parallelismFirst false > spark.sql.adaptive.enabled true > spark.sql.optimizer.canChangeCachedPlanOutputPartitioning true {code} > code using scala to reproduce is: > {code:java} > import java.util.UUID > import org.apache.spark.sql.functions.col > import spark.implicits._ > val data = (1 to 100).toDS().map(i => > UUID.randomUUID().toString).persist() > val left = data.map(k => (k, 1)) > val right = data.map(k => (k, k)) // if i change this to k => (k, 1) it works! > println("number of left " + left.count()) > println("number of right " + right.count()) > println("number of (left join right) " + > left.toDF("key", "value1").join(right.toDF("key", "value2"), "key").count() > ) > val left1 = left > .toDF("key", "value1") > .repartition(col("key")) // comment out this line to make it work > .persist() > println("number of left1 " + left1.count()) > val right1 = right > .toDF("key", "value2") > .repartition(col("key")) // comment out this line to make it work > .persist() > println("number of right1 " + right1.count()) > println("number of (left1 join right1) " + left1.join(right1, > "key").count()) // this gives incorrect result{code} > this produces the following output: > {code:java} > number of left 100 > number of right 100 > number of (left join right) 100 > number of left1 100 > number of right1 100 > number of (left1 join right1) 859531 {code} > note that the last number (the incorrect one) actually varies depending on > settings and cluster size etc. > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-45282) Join loses records for cached datasets
[ https://issues.apache.org/jira/browse/SPARK-45282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781168#comment-17781168 ] Dongjoon Hyun commented on SPARK-45282: --- Let me increase the priority to `Blocker` for now. > Join loses records for cached datasets > -- > > Key: SPARK-45282 > URL: https://issues.apache.org/jira/browse/SPARK-45282 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.1, 3.5.0 > Environment: spark 3.4.1 on apache hadoop 3.3.6 or kubernetes 1.26 or > databricks 13.3 >Reporter: koert kuipers >Priority: Blocker > Labels: CorrectnessBug, correctness > > we observed this issue on spark 3.4.1 but it is also present on 3.5.0. it is > not present on spark 3.3.1. > it only shows up in distributed environment. i cannot replicate in unit test. > however i did get it to show up on hadoop cluster, kubernetes, and on > databricks 13.3 > the issue is that records are dropped when two cached dataframes are joined. > it seems in spark 3.4.1 in queryplan some Exchanges are dropped as an > optimization while in spark 3.3.1 these Exhanges are still present. it seems > to be an issue with AQE with canChangeCachedPlanOutputPartitioning=true. > to reproduce on distributed cluster these settings needed are: > {code:java} > spark.sql.adaptive.advisoryPartitionSizeInBytes 33554432 > spark.sql.adaptive.coalescePartitions.parallelismFirst false > spark.sql.adaptive.enabled true > spark.sql.optimizer.canChangeCachedPlanOutputPartitioning true {code} > code using scala to reproduce is: > {code:java} > import java.util.UUID > import org.apache.spark.sql.functions.col > import spark.implicits._ > val data = (1 to 100).toDS().map(i => > UUID.randomUUID().toString).persist() > val left = data.map(k => (k, 1)) > val right = data.map(k => (k, k)) // if i change this to k => (k, 1) it works! > println("number of left " + left.count()) > println("number of right " + right.count()) > println("number of (left join right) " + > left.toDF("key", "value1").join(right.toDF("key", "value2"), "key").count() > ) > val left1 = left > .toDF("key", "value1") > .repartition(col("key")) // comment out this line to make it work > .persist() > println("number of left1 " + left1.count()) > val right1 = right > .toDF("key", "value2") > .repartition(col("key")) // comment out this line to make it work > .persist() > println("number of right1 " + right1.count()) > println("number of (left1 join right1) " + left1.join(right1, > "key").count()) // this gives incorrect result{code} > this produces the following output: > {code:java} > number of left 100 > number of right 100 > number of (left join right) 100 > number of left1 100 > number of right1 100 > number of (left1 join right1) 859531 {code} > note that the last number (the incorrect one) actually varies depending on > settings and cluster size etc. > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-45282) Join loses records for cached datasets
[ https://issues.apache.org/jira/browse/SPARK-45282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781167#comment-17781167 ] Dongjoon Hyun commented on SPARK-45282: --- Hi, All Is this correctness issue still valid in branch-3.4/3.5/master? Or, I'm wondering if this exists in the reported platform, Databricks 13.3? > Join loses records for cached datasets > -- > > Key: SPARK-45282 > URL: https://issues.apache.org/jira/browse/SPARK-45282 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.1, 3.5.0 > Environment: spark 3.4.1 on apache hadoop 3.3.6 or kubernetes 1.26 or > databricks 13.3 >Reporter: koert kuipers >Priority: Major > Labels: CorrectnessBug, correctness > > we observed this issue on spark 3.4.1 but it is also present on 3.5.0. it is > not present on spark 3.3.1. > it only shows up in distributed environment. i cannot replicate in unit test. > however i did get it to show up on hadoop cluster, kubernetes, and on > databricks 13.3 > the issue is that records are dropped when two cached dataframes are joined. > it seems in spark 3.4.1 in queryplan some Exchanges are dropped as an > optimization while in spark 3.3.1 these Exhanges are still present. it seems > to be an issue with AQE with canChangeCachedPlanOutputPartitioning=true. > to reproduce on distributed cluster these settings needed are: > {code:java} > spark.sql.adaptive.advisoryPartitionSizeInBytes 33554432 > spark.sql.adaptive.coalescePartitions.parallelismFirst false > spark.sql.adaptive.enabled true > spark.sql.optimizer.canChangeCachedPlanOutputPartitioning true {code} > code using scala to reproduce is: > {code:java} > import java.util.UUID > import org.apache.spark.sql.functions.col > import spark.implicits._ > val data = (1 to 100).toDS().map(i => > UUID.randomUUID().toString).persist() > val left = data.map(k => (k, 1)) > val right = data.map(k => (k, k)) // if i change this to k => (k, 1) it works! > println("number of left " + left.count()) > println("number of right " + right.count()) > println("number of (left join right) " + > left.toDF("key", "value1").join(right.toDF("key", "value2"), "key").count() > ) > val left1 = left > .toDF("key", "value1") > .repartition(col("key")) // comment out this line to make it work > .persist() > println("number of left1 " + left1.count()) > val right1 = right > .toDF("key", "value2") > .repartition(col("key")) // comment out this line to make it work > .persist() > println("number of right1 " + right1.count()) > println("number of (left1 join right1) " + left1.join(right1, > "key").count()) // this gives incorrect result{code} > this produces the following output: > {code:java} > number of left 100 > number of right 100 > number of (left join right) 100 > number of left1 100 > number of right1 100 > number of (left1 join right1) 859531 {code} > note that the last number (the incorrect one) actually varies depending on > settings and cluster size etc. > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-45282) Join loses records for cached datasets
[ https://issues.apache.org/jira/browse/SPARK-45282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769893#comment-17769893 ] koert kuipers commented on SPARK-45282: --- yes i can reproduce it. master branch on commit: {code:java} commit 7e8aafd2c0f1f6fcd03a69afe2b85fd3fda95d20 (HEAD -> master, upstream/master) Author: lanmengran1 Date: Tue Sep 26 21:01:02 2023 -0500 [SPARK-45334][SQL] Remove misleading comment in parquetSchemaConverter {code} i build spark for k8s using: {code:java} $ dev/make-distribution.sh --name kubernetes --tgz -Pkubernetes -Phadoop-cloud {code} created docker container using Dockerfile provided in resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile launch pod and shell inside: {code:java} 185@proxy:~/work-dir$ export SPARK_LOCAL_HOSTNAME=$(hostname -i 185@proxy:~/work-dir$ export SPARK_PUBLIC_DNS=$(hostname -i) 185@proxy:~/work-dir$ /opt/spark/bin/spark-shell --master k8s://https://kubernetes.default:443 --deploy-mode client --num-executors 4 --executor-memory 2G --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.kubernetes.namespace=default --conf spark.sql.adaptive.coalescePartitions.parallelismFirst=false --conf spark.sql.adaptive.enabled=true --conf spark.sql.adaptive.advisoryPartitionSizeInBytes=33554432 --conf spark.sql.optimizer.canChangeCachedPlanOutputPartitioning=true --conf spark.kubernetes.container.image=.dkr.ecr.us-east-1.amazonaws.com/spark:4.0.0-SNAPSHOT 23/09/28 03:44:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 4.0.0-SNAPSHOT /_/ Using Scala version 2.13.11 (OpenJDK 64-Bit Server VM, Java 21) Type in expressions to have them evaluated. Type :help for more information. Spark context Web UI available at http://10.177.71.94:4040 Spark context available as 'sc' (master = k8s://https://kubernetes.default:443, app id = spark-5ab0957571944828866a2f23068ff180). Spark session available as 'spark'.scala> :paste // Entering paste mode (ctrl-D to finish)import java.util.UUID import org.apache.spark.sql.functions.col import spark.implicits._ val data = (1 to 100).toDS().map(i => UUID.randomUUID().toString).persist() val left = data.map(k => (k, 1)) val right = data.map(k => (k, k)) // if i change this to k => (k, 1) it works! println("number of left " + left.count()) println("number of right " + right.count()) println("number of (left join right) " + left.toDF("key", "vertex").join(right.toDF("key", "state"), "key").count() ) val left1 = left .toDF("key", "vertex") .repartition(col("key")) // comment out this line to make it work .persist() println("number of left1 " + left1.count()) val right1 = right .toDF("key", "state") .repartition(col("key")) // comment out this line to make it work .persist() println("number of right1 " + right1.count()) println("number of (left1 join right1) " + left1.join(right1, "key").count()) // this gives incorrect result // Exiting paste mode, now interpreting. 23/09/28 03:45:30 WARN TaskSetManager: Stage 0 contains a task of very large size (6631 KiB). The maximum recommended task size is 1000 KiB. 23/09/28 03:45:34 WARN TaskSetManager: Stage 1 contains a task of very large size (6631 KiB). The maximum recommended task size is 1000 KiB. number of left 100 23/09/28 03:45:36 WARN TaskSetManager: Stage 4 contains a task of very large size (6631 KiB). The maximum recommended task size is 1000 KiB. number of right 100 23/09/28 03:45:39 WARN TaskSetManager: Stage 7 contains a task of very large size (6631 KiB). The maximum recommended task size is 1000 KiB. 23/09/28 03:45:40 WARN TaskSetManager: Stage 8 contains a task of very large size (6631 KiB). The maximum recommended task size is 1000 KiB. number of (left join right) 100 23/09/28 03:45:45 WARN TaskSetManager: Stage 16 contains a task of very large size (6631 KiB). The maximum recommended task size is 1000 KiB. number of left1 100 23/09/28 03:45:48 WARN TaskSetManager: Stage 24 contains a task of very large size (6631 KiB). The maximum recommended task size is 1000 KiB. number of right1 100 number of (left1 join right1) 850735 import java.util.UUID import
[jira] [Commented] (SPARK-45282) Join loses records for cached datasets
[ https://issues.apache.org/jira/browse/SPARK-45282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769383#comment-17769383 ] XiDuo You commented on SPARK-45282: --- I can not re-produce this issue in master branch (4.0.0), [~koert] have you tried master branch ? > Join loses records for cached datasets > -- > > Key: SPARK-45282 > URL: https://issues.apache.org/jira/browse/SPARK-45282 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.1, 3.5.0 > Environment: spark 3.4.1 on apache hadoop 3.3.6 or kubernetes 1.26 or > databricks 13.3 >Reporter: koert kuipers >Priority: Major > Labels: CorrectnessBug, correctness > > we observed this issue on spark 3.4.1 but it is also present on 3.5.0. it is > not present on spark 3.3.1. > it only shows up in distributed environment. i cannot replicate in unit test. > however i did get it to show up on hadoop cluster, kubernetes, and on > databricks 13.3 > the issue is that records are dropped when two cached dataframes are joined. > it seems in spark 3.4.1 in queryplan some Exchanges are dropped as an > optimization while in spark 3.3.1 these Exhanges are still present. it seems > to be an issue with AQE with canChangeCachedPlanOutputPartitioning=true. > to reproduce on distributed cluster these settings needed are: > {code:java} > spark.sql.adaptive.advisoryPartitionSizeInBytes 33554432 > spark.sql.adaptive.coalescePartitions.parallelismFirst false > spark.sql.adaptive.enabled true > spark.sql.optimizer.canChangeCachedPlanOutputPartitioning true {code} > code using scala to reproduce is: > {code:java} > import java.util.UUID > import org.apache.spark.sql.functions.col > import spark.implicits._ > val data = (1 to 100).toDS().map(i => > UUID.randomUUID().toString).persist() > val left = data.map(k => (k, 1)) > val right = data.map(k => (k, k)) // if i change this to k => (k, 1) it works! > println("number of left " + left.count()) > println("number of right " + right.count()) > println("number of (left join right) " + > left.toDF("key", "value1").join(right.toDF("key", "value2"), "key").count() > ) > val left1 = left > .toDF("key", "value1") > .repartition(col("key")) // comment out this line to make it work > .persist() > println("number of left1 " + left1.count()) > val right1 = right > .toDF("key", "value2") > .repartition(col("key")) // comment out this line to make it work > .persist() > println("number of right1 " + right1.count()) > println("number of (left1 join right1) " + left1.join(right1, > "key").count()) // this gives incorrect result{code} > this produces the following output: > {code:java} > number of left 100 > number of right 100 > number of (left join right) 100 > number of left1 100 > number of right1 100 > number of (left1 join right1) 859531 {code} > note that the last number (the incorrect one) actually varies depending on > settings and cluster size etc. > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-45282) Join loses records for cached datasets
[ https://issues.apache.org/jira/browse/SPARK-45282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17768399#comment-17768399 ] Yuming Wang commented on SPARK-45282: - cc [~ulysses] [~cloud_fan] > Join loses records for cached datasets > -- > > Key: SPARK-45282 > URL: https://issues.apache.org/jira/browse/SPARK-45282 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.1, 3.5.0 > Environment: spark 3.4.1 on apache hadoop 3.3.6 or kubernetes 1.26 or > databricks 13.3 >Reporter: koert kuipers >Priority: Major > Labels: CorrectnessBug, correctness > > we observed this issue on spark 3.4.1 but it is also present on 3.5.0. it is > not present on spark 3.3.1. > it only shows up in distributed environment. i cannot replicate in unit test. > however i did get it to show up on hadoop cluster, kubernetes, and on > databricks 13.3 > the issue is that records are dropped when two cached dataframes are joined. > it seems in spark 3.4.1 in queryplan some Exchanges are dropped as an > optimization while in spark 3.3.1 these Exhanges are still present. it seems > to be an issue with AQE with canChangeCachedPlanOutputPartitioning=true. > to reproduce on distributed cluster these settings needed are: > {code:java} > spark.sql.adaptive.advisoryPartitionSizeInBytes 33554432 > spark.sql.adaptive.coalescePartitions.parallelismFirst false > spark.sql.adaptive.enabled true > spark.sql.optimizer.canChangeCachedPlanOutputPartitioning true {code} > code using scala to reproduce is: > {code:java} > import java.util.UUID > import org.apache.spark.sql.functions.col > import spark.implicits._ > val data = (1 to 100).toDS().map(i => > UUID.randomUUID().toString).persist() > val left = data.map(k => (k, 1)) > val right = data.map(k => (k, k)) // if i change this to k => (k, 1) it works! > println("number of left " + left.count()) > println("number of right " + right.count()) > println("number of (left join right) " + > left.toDF("key", "value1").join(right.toDF("key", "value2"), "key").count() > ) > val left1 = left > .toDF("key", "value1") > .repartition(col("key")) // comment out this line to make it work > .persist() > println("number of left1 " + left1.count()) > val right1 = right > .toDF("key", "value2") > .repartition(col("key")) // comment out this line to make it work > .persist() > println("number of right1 " + right1.count()) > println("number of (left1 join right1) " + left1.join(right1, > "key").count()) // this gives incorrect result{code} > this produces the following output: > {code:java} > number of left 100 > number of right 100 > number of (left join right) 100 > number of left1 100 > number of right1 100 > number of (left1 join right1) 859531 {code} > note that the last number (the incorrect one) actually varies depending on > settings and cluster size etc. > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-45282) Join loses records for cached datasets
[ https://issues.apache.org/jira/browse/SPARK-45282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17768123#comment-17768123 ] koert kuipers commented on SPARK-45282: --- after reverting SPARK-41048 the issue went away. so i think this is the cause. > Join loses records for cached datasets > -- > > Key: SPARK-45282 > URL: https://issues.apache.org/jira/browse/SPARK-45282 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.1, 3.5.0 > Environment: spark 3.4.1 on apache hadoop 3.3.6 or kubernetes 1.26 or > databricks 13.3 >Reporter: koert kuipers >Priority: Major > Labels: CorrectnessBug, correctness > > we observed this issue on spark 3.4.1 but it is also present on 3.5.0. it is > not present on spark 3.3.1. > it only shows up in distributed environment. i cannot replicate in unit test. > however i did get it to show up on hadoop cluster, kubernetes, and on > databricks 13.3 > the issue is that records are dropped when two cached dataframes are joined. > it seems in spark 3.4.1 in queryplan some Exchanges are dropped as an > optimization while in spark 3.3.1 these Exhanges are still present. it seems > to be an issue with AQE with canChangeCachedPlanOutputPartitioning=true. > to reproduce on distributed cluster these settings needed are: > {code:java} > spark.sql.adaptive.advisoryPartitionSizeInBytes 33554432 > spark.sql.adaptive.coalescePartitions.parallelismFirst false > spark.sql.adaptive.enabled true > spark.sql.optimizer.canChangeCachedPlanOutputPartitioning true {code} > code using scala 2.13 to reproduce is: > {code:java} > import java.util.UUID > import org.apache.spark.sql.functions.col > import spark.implicits._ > val data = (1 to 100).toDS().map(i => > UUID.randomUUID().toString).persist() > val left = data.map(k => (k, 1)) > val right = data.map(k => (k, k)) // if i change this to k => (k, 1) it works! > println("number of left " + left.count()) > println("number of right " + right.count()) > println("number of (left join right) " + > left.toDF("key", "vertex").join(right.toDF("key", "state"), "key").count() > ) > val left1 = left > .toDF("key", "vertex") > .repartition(col("key")) // comment out this line to make it work > .persist() > println("number of left1 " + left1.count()) > val right1 = right > .toDF("key", "state") > .repartition(col("key")) // comment out this line to make it work > .persist() > println("number of right1 " + right1.count()) > println("number of (left1 join right1) " + left1.join(right1, > "key").count()) // this gives incorrect result{code} > this produces the following output: > {code:java} > number of left 100 > number of right 100 > number of (left join right) 100 > number of left1 100 > number of right1 100 > number of (left1 join right1) 859531 {code} > note that the last number (the incorrect one) actually varies depending on > settings and cluster size etc. > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org