Github user davies commented on a diff in the pull request:
https://github.com/apache/spark/pull/14266#discussion_r73616369
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
---
@@ -576,4 +576,605 @@ class AggregateBenchmark extends BenchmarkBase {
benchmark.run()
}
-}
+ // This test does not do any benchmark, instead it produces generated
code for vectorized
+ // and row-based hashmaps.
+ ignore("generated code comparison for vectorized vs. rowbased") {
+ val N = 20 << 23
+
+ sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
+ sparkSession.conf.set("spark.sql.codegen.aggregate.map.columns.max",
"30")
+
+ sparkSession.range(N)
+ .selectExpr(
+ "id & 1023 as k1",
+ "cast (id & 1023 as string) as k2")
+ .createOrReplaceTempView("test")
+
+ // dataframe/query
+ val query = sparkSession.sql("select count(k1), sum(k1) from test
group by k1, k2")
+
+ // vectorized
+ sparkSession.conf.set("spark.sql.codegen.aggregate.map.enforce.impl",
"vectorized")
+ query.queryExecution.debug.codegen()
+
+ // row based
+ sparkSession.conf.set("spark.sql.codegen.aggregate.map.enforce.impl",
"rowbased")
+ query.queryExecution.debug.codegen()
+ }
+
+ ignore("1 key field, 1 value field, distinct linear keys") {
+ val N = 20 << 22;
+
+ var timeStart: Long = 0L
+ var timeEnd: Long = 0L
+ var nsPerRow: Long = 0L
+ var i = 0
+ sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
+ sparkSession.conf.set("spark.sql.codegen.aggregate.map.columns.max",
"30")
+
+ // scalastyle:off
+ println(Benchmark.getJVMOSInfo())
+ println(Benchmark.getProcessorName())
+ printf("%20s %20s %20s %20s\n", "Num. Distinct Keys", "No Fast
Hashmap",
+ "Vectorized", "Row-based")
+ // scalastyle:on
+
+ val modes = List("skip", "vectorized", "rowbased")
+
+ while (i < 15) {
+ val results = modes.map(mode => {
+
sparkSession.conf.set("spark.sql.codegen.aggregate.map.enforce.impl", mode)
+ var j = 0
+ var minTime: Long = 1000
+ while (j < 5) {
+ System.gc()
+ sparkSession.range(N)
+ .selectExpr(
+ "id & " + ((1 << i) - 1) + " as k0")
+ .createOrReplaceTempView("test")
+ timeStart = System.nanoTime
+ sparkSession.sql("select sum(k0)" +
+ " from test group by k0").collect()
+ timeEnd = System.nanoTime
+ nsPerRow = (timeEnd - timeStart) / N
+ if (j > 1 && minTime > nsPerRow) minTime = nsPerRow
+ j += 1
+ }
+ minTime
+ })
+ printf("%20s %20s %20s %20s\n", 1 << i, results(0), results(1),
results(2))
+ i += 1
+ }
+ printf("Unit: ns/row\n")
+
+ /*
+ Java HotSpot(TM) 64-Bit Server VM 1.8.0_91-b14 on Mac OS X 10.11.5
+ Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz
+
+ Num. Distinct Keys No Fast Hashmap Vectorized
Row-based
+ 1 21 13
11
+ 2 23 14
13
+ 4 23 14
14
+ 8 23 14
14
+ 16 23 12
13
+ 32 24 12
13
+ 64 24 14
16
+ 128 24 14
13
+ 256 25 14
14
+ 512 25 16
14
+ 1024 25 16
15
+ 2048 26 12
15
+ 4096 27 15
15
+ 8192 33 16
15
+ 16384 34 15
15
+ Unit: ns/row
+ */
+ }
+
+ ignore("1 key field, 1 value field, distinct random keys") {
+ val N = 20 << 22;
+
+ var timeStart: Long = 0L
+ var timeEnd: Long = 0L
+ var nsPerRow: Long = 0L
+ var i = 0
+ sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
+ sparkSession.conf.set("spark.sql.codegen.aggregate.map.columns.max",
"30")
+
+ // scalastyle:off
+ println(Benchmark.getJVMOSInfo())
+ println(Benchmark.getProcessorName())
+ printf("%20s %20s %20s %20s\n", "Num. Distinct Keys", "No Fast
Hashmap",
+ "Vectorized", "Row-based")
+ // scalastyle:on
+
+ val modes = List("skip", "vectorized", "rowbased")
+
+ while (i < 15) {
+ val results = modes.map(mode => {
+
sparkSession.conf.set("spark.sql.codegen.aggregate.map.enforce.impl", mode)
+ var j = 0
+ var minTime: Long = 1000
+ while (j < 5) {
+ System.gc()
+ sparkSession.range(N)
+ .selectExpr(
+ "cast(floor(rand() * " + (1 << i) + ") as long) as k0")
+ .createOrReplaceTempView("test")
+ timeStart = System.nanoTime
+ sparkSession.sql("select sum(k0)" +
+ " from test group by k0").collect()
+ timeEnd = System.nanoTime
+ nsPerRow = (timeEnd - timeStart) / N
+ if (j > 1 && minTime > nsPerRow) minTime = nsPerRow
+ j += 1
+ }
+ minTime
+ })
+ printf("%20s %20s %20s %20s\n", 1 << i, results(0), results(1),
results(2))
+ i += 1
+ }
+ printf("Unit: ns/row\n")
+
+ /*
+ Java HotSpot(TM) 64-Bit Server VM 1.8.0_91-b14 on Mac OS X 10.11.5
+ Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz
+
+ Num. Distinct Keys No Fast Hashmap Vectorized
Row-based
+ 1 32 9
13
+ 2 39 16
22
+ 4 39 14
23
+ 8 39 13
22
+ 16 38 13
20
+ 32 38 13
20
+ 64 38 13
20
+ 128 37 16
21
+ 256 36 17
22
+ 512 38 17
21
+ 1024 39 18
21
+ 2048 41 18
21
+ 4096 44 18
22
+ 8192 49 20
23
+ 16384 52 23
25
+ Unit: ns/row
+ */
+ }
+
+ ignore("1 key field, varying value fields, 16 linear distinct keys") {
+ val N = 20 << 22;
+
+ var timeStart: Long = 0L
+ var timeEnd: Long = 0L
+ var nsPerRow: Long = 0L
+ var i = 1
+ sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
+ sparkSession.conf.set("spark.sql.codegen.aggregate.map.columns.max",
"30")
+
+ // scalastyle:off
+ println(Benchmark.getJVMOSInfo())
+ println(Benchmark.getProcessorName())
+ printf("%20s %20s %20s %20s\n", "Num. Value Fields", "No Fast Hashmap",
+ "Vectorized", "Row-based")
+ // scalastyle:on
+
+ val modes = List("skip", "vectorized", "rowbased")
+
+ while (i < 11) {
+ val results = modes.map(mode => {
+
sparkSession.conf.set("spark.sql.codegen.aggregate.map.enforce.impl", mode)
+ var j = 0
+ var minTime: Long = 1000
+ while (j < 5) {
+ System.gc()
+ sparkSession.range(N)
+ .selectExpr("id & " + 15 + " as k0")
+ .createOrReplaceTempView("test")
+ timeStart = System.nanoTime
+ sparkSession.sql("select " + List.range(0, i).map(x => "sum(k" +
0 + ")").mkString(",") +
+ " from test group by k0").collect()
+ timeEnd = System.nanoTime
+ nsPerRow = (timeEnd - timeStart) / N
+ if (j > 1 && minTime > nsPerRow) minTime = nsPerRow
+ j += 1
+ }
+ minTime
+ })
+ printf("%20s %20s %20s %20s\n", i, results(0), results(1),
results(2))
+ i += 1
+ }
+ printf("Unit: ns/row\n")
+
+ /*
+ Java HotSpot(TM) 64-Bit Server VM 1.8.0_91-b14 on Mac OS X 10.11.5
+ Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz
+
+ Num. Value Fields No Fast Hashmap Vectorized
Row-based
+ 1 24 15
12
+ 2 25 24
14
+ 3 29 25
17
+ 4 31 32
22
+ 5 33 40
24
+ 6 36 36
27
+ 7 38 44
28
+ 8 47 50
32
+ 9 52 55
37
+ 10 59 59
45
+ Unit: ns/row
+ */
+ }
+
+ ignore("varying key fields, 1 value field, 16 linear distinct keys") {
+ val N = 20 << 22;
+
+ var timeStart: Long = 0L
+ var timeEnd: Long = 0L
+ var nsPerRow: Long = 0L
+ var i = 1
+ sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
+ sparkSession.conf.set("spark.sql.codegen.aggregate.map.columns.max",
"30")
+
+ // scalastyle:off
+ println(Benchmark.getJVMOSInfo())
+ println(Benchmark.getProcessorName())
+ printf("%20s %20s %20s %20s\n", "Num. Key Fields", "No Fast Hashmap",
+ "Vectorized", "Row-based")
+ // scalastyle:on
+
+ val modes = List("skip", "vectorized", "rowbased")
+
+ while (i < 11) {
+ val results = modes.map(mode => {
+
sparkSession.conf.set("spark.sql.codegen.aggregate.map.enforce.impl", mode)
+ var j = 0
+ var minTime: Long = 1000
+ while (j < 5) {
+ System.gc()
+ val s = "id & " + 15 + " as k"
+ sparkSession.range(N)
+ .selectExpr(List.range(0, i).map(x => s + x): _*)
+ .createOrReplaceTempView("test")
+ timeStart = System.nanoTime
+ sparkSession.sql("select sum(k0)" +
+ " from test group by " + List.range(0, i).map(x => "k" +
x).mkString(",")).collect()
+ timeEnd = System.nanoTime
+ nsPerRow = (timeEnd - timeStart) / N
+ if (j > 1 && minTime > nsPerRow) minTime = nsPerRow
+ j += 1
+ }
+ minTime
+ })
+ printf("%20s %20s %20s %20s\n", i, results(0), results(1),
results(2))
+ i += 1
+ }
+ printf("Unit: ns/row\n")
+
+ /*
+ Java HotSpot(TM) 64-Bit Server VM 1.8.0_91-b14 on Mac OS X 10.11.5
+ Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz
+
+ Num. Key Fields No Fast Hashmap Vectorized
Row-based
+ 1 24 15
13
+ 2 31 20
14
+ 3 37 22
17
+ 4 46 26
18
+ 5 53 27
20
+ 6 61 29
23
+ 7 69 36
25
+ 8 78 37
27
+ 9 88 43
30
+ 10 92 45
33
+ Unit: ns/row
+ */
+ }
+
+ ignore("varying key fields, varying value field, 16 linear distinct
keys") {
--- End diff --
The performance difference between column-based and row-based are cache
locality, could you increase the number of distinct keys to make sure that not
all the keys/values are fit in L1 cache? for example, 4k. We could also
increase that to 64k in first two cases (single key, single value).
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]