Github user heary-cao commented on the issue:
https://github.com/apache/spark/pull/20415
@hvanhovell ,thank you for review it.
I tested the code for this PR change,
**in FileSourceScanExec->doExecute code:**
```
if (needsUnsafeRowConversion) {
scan.mapPartitionsWithIndexInternal { (index, iter) =>
val proj = UnsafeProjection.create(schema)
proj.initialize(index)
iter.map( r => {
numOutputRows += 1
proj(r)
})
}
} else {
val scanOther = scan.mapPartitionsWithIndexInternal { (index, iter)
=>
val proj = UnsafeProjection.create(schema)
proj.initialize(index)
iter.map(proj)
}
scanOther.map { r =>
numOutputRows += 1
r
}
}
```
**Start spark-shell:**
> ./spark-shell --executor-memory 15G --total-executor-cores 1 --conf
spark.executor.cores=1
**test code:**
> val df4 = (0 until 500000).map(i => (i % 2, i % 3, i % 4, i % 5, i % 6, i
% 7, i % 8)).toDF("i2","i3","i4","i5","i6","i7","i8")
> df4.write.format("parquet").partitionBy("i2", "i3", "i4").bucketBy(8,
"i5","i6","i7","i8").saveAsTable("table500000")
>
> def runBenchmark(name: String, cardinality: Int)(f: => Unit): Unit = {
> val startTime = System.nanoTime
> (0 to cardinality).foreach(i => f)
> val endTime = System.nanoTime
> println(s"Time taken in $name: " + (endTime - startTime).toDouble /
1000000000 + " seconds")
> }
>
> def benchmark(name: String, card: Int)(f: => Unit){
> (0 to card).foreach(i => f)
> }
>
> After modified File SourceScan Exec:
> benchmark("File SourceScan Exec", 2){
> runBenchmark("After modified File SourceScan Exec ", 200) {
> spark.conf.set("spark.sql.codegen.maxFields", 2)
> spark.conf.set("spark.sql.parquet.enableVectorizedReader", true)
> spark.sql("select * from table500000").count()
> }
> }
>
> Before modified File SourceScan Exec:
> benchmark("File SourceScan Exec", 2){
> runBenchmark("Before modified File SourceScan Exec ", 200) {
> spark.conf.set("spark.sql.codegen.maxFields", 2)
> spark.conf.set("spark.sql.parquet.enableVectorizedReader", false)
> spark.sql("select * from table500000").count()
> }
> }
>
**test result:**
>
> Test 20 times:
> *Test times: first times(s) second times(s) Third
times(s) avg(s)
>
*-----------------------------------------------------------------------------------------
> *Before modified 10.97 10.83 11.05
10.95
> *After modified 9.33 9.61 9.32
9.42
>
>
> Test 100 times:
> *Test times: first times(s) second times(s) Third
times(s) avg(s)
>
*-----------------------------------------------------------------------------------------
> *Before modified 51.74 52.80 71.88
58.80
> *After modified 47.24 46.18 48.92
47.45
>
>
> Test 200 times:
> *Test times: first times(s) second times(s) Third
times(s) avg(s)
>
*-----------------------------------------------------------------------------------------
> *Before modified 236.85 325.97 395.69
319.50
> *After modified 208.90 244.13 261.18
238.07
>
>
thanks.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]