agrawaldevesh commented on a change in pull request #29342:
URL: https://github.com/apache/spark/pull/29342#discussion_r467709017
##########
File path: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
##########
@@ -1188,4 +1188,53 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
classOf[BroadcastNestedLoopJoinExec]))
}
}
+
+ test("SPARK-32399: Full outer shuffled hash join") {
+ val inputDFs = Seq(
+ // Test unique join key
+ (spark.range(10).selectExpr("id as k1"),
+ spark.range(30).selectExpr("id as k2"),
+ $"k1" === $"k2"),
+ // Test non-unique join key
+ (spark.range(10).selectExpr("id % 5 as k1"),
+ spark.range(30).selectExpr("id % 5 as k2"),
+ $"k1" === $"k2"),
+ // Test string join key
+ (spark.range(10).selectExpr("cast(id * 3 as string) as k1"),
+ spark.range(30).selectExpr("cast(id as string) as k2"),
+ $"k1" === $"k2"),
+ // Test build side at right
+ (spark.range(30).selectExpr("cast(id / 3 as string) as k1"),
+ spark.range(10).selectExpr("cast(id as string) as k2"),
+ $"k1" === $"k2"),
+ // Test NULL join key
+ (spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr("value
as k1"),
+ spark.range(30).map(i => if (i % 4 == 0) i else
null).selectExpr("value as k2"),
+ $"k1" === $"k2"),
+ // Test multiple join keys
+ (spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr(
+ "value as k1", "cast(value % 5 as short) as k2", "cast(value * 3 as
long) as k3"),
+ spark.range(30).map(i => if (i % 4 == 0) i else null).selectExpr(
+ "value as k4", "cast(value % 5 as short) as k5", "cast(value * 3 as
long) as k6"),
+ $"k1" === $"k4" && $"k2" === $"k5" && $"k3" === $"k6")
+ )
+ inputDFs.foreach { case (df1, df2, joinExprs) =>
+ withSQLConf(
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80",
Review comment:
You have used the word 'careful tuning' several times but I am not fully
following out of my stupidity: Do you mean that to say that the broadcast
threshold should be chosen to be larger than the build side size, so as to NOT
trigger a BHJ ?
Perhaps add a comment above this line to explain to the reader why the
configs are chosen the way they are: For example why shuffle-partitions is 2
etc.
Btw, is it worth testing with shuffle partitions = 1 ?
##########
File path: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
##########
@@ -1188,4 +1188,42 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
classOf[BroadcastNestedLoopJoinExec]))
}
}
+
+ test("SPARK-32399: Full outer shuffled hash join") {
Review comment:
Thanks for noticing the lack in the test coverage and explaining this to
me ! I understand this better now.
But you see where I am coming from :-) I am simply concerned about data
corruption (more precisely wrong result computation).
Ideally, we should first fix the test, land that fix and then retest this PR
with the tests fixed. It seems like it shouldn't be too hard, nor cause any
merge conflicts, but it would delay landing this PR.
As a compromise, may I please suggest that:
- Make that test change as mentioned in SPARK-32577 locally and run the sql
tests locally to verify that they work.
- Run JoinSuite with code coverage enabled and ensure that all of the
changed code paths in this PR are covered.
I recently worked on a join related feature and I was humbled by how many
bugs I was able to make around nulls, strings and its relationship with
downstream union queries :-D
I think the tests in JoinSuite are good, but I am more concerned about the
unknown unknowns and any regressions. And interactions of join with other
operators like aggregations and unions.
This is not a blocker at all !. Just due diligence and paranoia.
##########
File path: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
##########
@@ -1188,4 +1188,53 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
classOf[BroadcastNestedLoopJoinExec]))
}
}
+
+ test("SPARK-32399: Full outer shuffled hash join") {
+ val inputDFs = Seq(
+ // Test unique join key
+ (spark.range(10).selectExpr("id as k1"),
+ spark.range(30).selectExpr("id as k2"),
+ $"k1" === $"k2"),
+ // Test non-unique join key
+ (spark.range(10).selectExpr("id % 5 as k1"),
+ spark.range(30).selectExpr("id % 5 as k2"),
+ $"k1" === $"k2"),
+ // Test string join key
+ (spark.range(10).selectExpr("cast(id * 3 as string) as k1"),
+ spark.range(30).selectExpr("cast(id as string) as k2"),
+ $"k1" === $"k2"),
+ // Test build side at right
+ (spark.range(30).selectExpr("cast(id / 3 as string) as k1"),
+ spark.range(10).selectExpr("cast(id as string) as k2"),
+ $"k1" === $"k2"),
+ // Test NULL join key
+ (spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr("value
as k1"),
+ spark.range(30).map(i => if (i % 4 == 0) i else
null).selectExpr("value as k2"),
+ $"k1" === $"k2"),
+ // Test multiple join keys
+ (spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr(
Review comment:
How about uncorrelated nulls too : 'i % 2 == 0' on the left and say 'i %
3 == 0'
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]