Github user dongjoon-hyun commented on a diff in the pull request:
https://github.com/apache/spark/pull/22661#discussion_r223220438
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala
---
@@ -19,229 +19,164 @@ package org.apache.spark.sql.execution.benchmark
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.IntegerType
/**
* Benchmark to measure performance for aggregate primitives.
- * To run this:
- * build/sbt "sql/test-only *benchmark.JoinBenchmark"
- *
- * Benchmarks in this file are skipped in normal builds.
+ * To run this benchmark:
+ * {{{
+ * 1. without sbt:
+ * bin/spark-submit --class <this class> --jars <spark core test jar>
<spark sql test jar>
+ * 2. build/sbt "sql/test:runMain <this class>"
+ * 3. generate result:
+ * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this
class>"
+ * Results will be written to "benchmarks/JoinBenchmark-results.txt".
+ * }}}
*/
-class JoinBenchmark extends BenchmarkWithCodegen {
+object JoinBenchmark extends SqlBasedBenchmark {
- ignore("broadcast hash join, long key") {
+ def broadcastHashJoinLongKey(): Unit = {
val N = 20 << 20
val M = 1 << 16
- val dim = broadcast(sparkSession.range(M).selectExpr("id as k",
"cast(id as string) as v"))
- runBenchmark("Join w long", N) {
- val df = sparkSession.range(N).join(dim, (col("id") % M) ===
col("k"))
+ val dim = broadcast(spark.range(M).selectExpr("id as k", "cast(id as
string) as v"))
+ codegenBenchmark("Join w long", N) {
+ val df = spark.range(N).join(dim, (col("id") % M) === col("k"))
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
df.count()
}
-
- /*
- Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5
- Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
- Join w long: Best/Avg Time(ms) Rate(M/s)
Per Row(ns) Relative
-
-------------------------------------------------------------------------------------------
- Join w long codegen=false 3002 / 3262 7.0
143.2 1.0X
- Join w long codegen=true 321 / 371 65.3
15.3 9.3X
- */
}
- ignore("broadcast hash join, long key with duplicates") {
+
+ def broadcastHashJoinLongKeyWithDuplicates(): Unit = {
val N = 20 << 20
val M = 1 << 16
- val dim = broadcast(sparkSession.range(M).selectExpr("id as k",
"cast(id as string) as v"))
- runBenchmark("Join w long duplicated", N) {
- val dim = broadcast(sparkSession.range(M).selectExpr("cast(id/10 as
long) as k"))
- val df = sparkSession.range(N).join(dim, (col("id") % M) ===
col("k"))
+ codegenBenchmark("Join w long duplicated", N) {
+ val dim = broadcast(spark.range(M).selectExpr("cast(id/10 as long)
as k"))
+ val df = spark.range(N).join(dim, (col("id") % M) === col("k"))
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
df.count()
}
-
- /*
- *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5
- *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
- *Join w long duplicated: Best/Avg Time(ms) Rate(M/s)
Per Row(ns) Relative
-
*-------------------------------------------------------------------------------------------
- *Join w long duplicated codegen=false 3446 / 3478 6.1
164.3 1.0X
- *Join w long duplicated codegen=true 322 / 351 65.2
15.3 10.7X
- */
}
- ignore("broadcast hash join, two int key") {
+ def broadcastHashJoinTwoIntKey(): Unit = {
val N = 20 << 20
val M = 1 << 16
- val dim2 = broadcast(sparkSession.range(M)
+ val dim2 = broadcast(spark.range(M)
.selectExpr("cast(id as int) as k1", "cast(id as int) as k2",
"cast(id as string) as v"))
- runBenchmark("Join w 2 ints", N) {
- val df = sparkSession.range(N).join(dim2,
+ codegenBenchmark("Join w 2 ints", N) {
+ val df = spark.range(N).join(dim2,
(col("id") % M).cast(IntegerType) === col("k1")
&& (col("id") % M).cast(IntegerType) === col("k2"))
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
df.count()
}
-
- /*
- *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5
- *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
- *Join w 2 ints: Best/Avg Time(ms) Rate(M/s)
Per Row(ns) Relative
-
*-------------------------------------------------------------------------------------------
- *Join w 2 ints codegen=false 4426 / 4501 4.7
211.1 1.0X
- *Join w 2 ints codegen=true 791 / 818 26.5
37.7 5.6X
- */
}
- ignore("broadcast hash join, two long key") {
+ def broadcastHashJoinTwoLongKey(): Unit = {
val N = 20 << 20
val M = 1 << 16
- val dim3 = broadcast(sparkSession.range(M)
+ val dim3 = broadcast(spark.range(M)
.selectExpr("id as k1", "id as k2", "cast(id as string) as v"))
- runBenchmark("Join w 2 longs", N) {
- val df = sparkSession.range(N).join(dim3,
+ codegenBenchmark("Join w 2 longs", N) {
+ val df = spark.range(N).join(dim3,
(col("id") % M) === col("k1") && (col("id") % M) === col("k2"))
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
df.count()
}
-
- /*
- *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5
- *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
- *Join w 2 longs: Best/Avg Time(ms) Rate(M/s)
Per Row(ns) Relative
-
*-------------------------------------------------------------------------------------------
- *Join w 2 longs codegen=false 5905 / 6123 3.6
281.6 1.0X
- *Join w 2 longs codegen=true 2230 / 2529 9.4
106.3 2.6X
- */
}
- ignore("broadcast hash join, two long key with duplicates") {
+ def broadcastHashJoinTwoLongKeyWithDuplicates(): Unit = {
val N = 20 << 20
val M = 1 << 16
- val dim4 = broadcast(sparkSession.range(M)
+ val dim4 = broadcast(spark.range(M)
.selectExpr("cast(id/10 as long) as k1", "cast(id/10 as long) as
k2"))
- runBenchmark("Join w 2 longs duplicated", N) {
- val df = sparkSession.range(N).join(dim4,
+ codegenBenchmark("Join w 2 longs duplicated", N) {
+ val df = spark.range(N).join(dim4,
(col("id") bitwiseAND M) === col("k1") && (col("id") bitwiseAND M)
=== col("k2"))
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
df.count()
}
-
- /*
- *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
- *Join w 2 longs duplicated: Best/Avg Time(ms) Rate(M/s)
Per Row(ns) Relative
-
*-------------------------------------------------------------------------------------------
- *Join w 2 longs duplicated codegen=false 6420 / 6587
3.3 306.1 1.0X
- *Join w 2 longs duplicated codegen=true 2080 / 2139 10.1
99.2 3.1X
- */
}
- ignore("broadcast hash join, outer join long key") {
+
+ def broadcastHashJoinOuterJoinLongKey(): Unit = {
val N = 20 << 20
val M = 1 << 16
- val dim = broadcast(sparkSession.range(M).selectExpr("id as k",
"cast(id as string) as v"))
- runBenchmark("outer join w long", N) {
- val df = sparkSession.range(N).join(dim, (col("id") % M) ===
col("k"), "left")
+ val dim = broadcast(spark.range(M).selectExpr("id as k", "cast(id as
string) as v"))
+ codegenBenchmark("outer join w long", N) {
+ val df = spark.range(N).join(dim, (col("id") % M) === col("k"),
"left")
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
df.count()
}
-
- /*
- *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5
- *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
- *outer join w long: Best/Avg Time(ms) Rate(M/s)
Per Row(ns) Relative
-
*-------------------------------------------------------------------------------------------
- *outer join w long codegen=false 3055 / 3189 6.9
145.7 1.0X
- *outer join w long codegen=true 261 / 276 80.5
12.4 11.7X
- */
}
- ignore("broadcast hash join, semi join long key") {
+
+ def broadcastHashJoinSemiJoinLongKey(): Unit = {
val N = 20 << 20
val M = 1 << 16
- val dim = broadcast(sparkSession.range(M).selectExpr("id as k",
"cast(id as string) as v"))
- runBenchmark("semi join w long", N) {
- val df = sparkSession.range(N).join(dim, (col("id") % M) ===
col("k"), "leftsemi")
+ val dim = broadcast(spark.range(M).selectExpr("id as k", "cast(id as
string) as v"))
+ codegenBenchmark("semi join w long", N) {
+ val df = spark.range(N).join(dim, (col("id") % M) === col("k"),
"leftsemi")
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
df.count()
}
-
- /*
- *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5
- *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
- *semi join w long: Best/Avg Time(ms) Rate(M/s)
Per Row(ns) Relative
-
*-------------------------------------------------------------------------------------------
- *semi join w long codegen=false 1912 / 1990 11.0
91.2 1.0X
- *semi join w long codegen=true 237 / 244 88.3
11.3 8.1X
- */
}
- ignore("sort merge join") {
+ def sortMergeJoin(): Unit = {
val N = 2 << 20
- runBenchmark("merge join", N) {
- val df1 = sparkSession.range(N).selectExpr(s"id * 2 as k1")
- val df2 = sparkSession.range(N).selectExpr(s"id * 3 as k2")
+ codegenBenchmark("merge join", N) {
+ val df1 = spark.range(N).selectExpr(s"id * 2 as k1")
+ val df2 = spark.range(N).selectExpr(s"id * 3 as k2")
val df = df1.join(df2, col("k1") === col("k2"))
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
df.count()
}
-
- /*
- *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
- *merge join: Best/Avg Time(ms) Rate(M/s)
Per Row(ns) Relative
-
*-------------------------------------------------------------------------------------------
- *merge join codegen=false 1588 / 1880 1.3
757.1 1.0X
- *merge join codegen=true 1477 / 1531 1.4
704.2 1.1X
- */
}
- ignore("sort merge join with duplicates") {
+ def sortMergeJoinWithDuplicates(): Unit = {
val N = 2 << 20
- runBenchmark("sort merge join", N) {
- val df1 = sparkSession.range(N)
+ codegenBenchmark("sort merge join with duplicates", N) {
+ val df1 = spark.range(N)
.selectExpr(s"(id * 15485863) % ${N*10} as k1")
- val df2 = sparkSession.range(N)
+ val df2 = spark.range(N)
.selectExpr(s"(id * 15485867) % ${N*10} as k2")
val df = df1.join(df2, col("k1") === col("k2"))
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
df.count()
}
-
- /*
- *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
- *sort merge join: Best/Avg Time(ms) Rate(M/s)
Per Row(ns) Relative
-
*-------------------------------------------------------------------------------------------
- *sort merge join codegen=false 3626 / 3667 0.6
1728.9 1.0X
- *sort merge join codegen=true 3405 / 3438 0.6
1623.8 1.1X
- */
}
- ignore("shuffle hash join") {
- val N = 4 << 20
- sparkSession.conf.set("spark.sql.shuffle.partitions", "2")
- sparkSession.conf.set("spark.sql.autoBroadcastJoinThreshold",
"10000000")
- sparkSession.conf.set("spark.sql.join.preferSortMergeJoin", "false")
- runBenchmark("shuffle hash join", N) {
- val df1 = sparkSession.range(N).selectExpr(s"id as k1")
- val df2 = sparkSession.range(N / 3).selectExpr(s"id * 3 as k2")
- val df = df1.join(df2, col("k1") === col("k2"))
-
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[ShuffledHashJoinExec]).isDefined)
- df.count()
+ def shuffleHashJoin(): Unit = {
+ val N: Long = 4 << 20
+ withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "2",
--- End diff --
nit. Could you put `SQLConf.SHUFFLE_PARTITIONS.key` at the next line?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]