Github user heary-cao commented on the issue:

    https://github.com/apache/spark/pull/20415
  
    @hvanhovell ,thank you for review it.
    I tested the code for this PR change, 
    
    **in FileSourceScanExec->doExecute code:**
    
    ```
          if (needsUnsafeRowConversion) {
            scan.mapPartitionsWithIndexInternal { (index, iter) =>
              val proj = UnsafeProjection.create(schema)
              proj.initialize(index)
              iter.map( r => {
                numOutputRows += 1
                proj(r)
              })
            }
          } else {
            val scanOther = scan.mapPartitionsWithIndexInternal { (index, iter) 
=>
              val proj = UnsafeProjection.create(schema)
              proj.initialize(index)
              iter.map(proj)
            }
    
            scanOther.map { r =>
              numOutputRows += 1
              r
            }
          }
    
    ```
    
    **Start spark-shell:**
    
    > ./spark-shell --executor-memory 15G  --total-executor-cores 1 --conf 
spark.executor.cores=1
    
    
    **test code:**
    
    > val df4 = (0 until 500000).map(i => (i % 2, i % 3, i % 4, i % 5, i % 6, i 
% 7, i % 8)).toDF("i2","i3","i4","i5","i6","i7","i8")
    > df4.write.format("parquet").partitionBy("i2", "i3", "i4").bucketBy(8, 
"i5","i6","i7","i8").saveAsTable("table500000")
    > 
    > def runBenchmark(name: String, cardinality: Int)(f: => Unit): Unit = {
    >   val startTime = System.nanoTime
    >   (0 to cardinality).foreach(i => f)
    >   val endTime = System.nanoTime
    >   println(s"Time taken in $name: " + (endTime - startTime).toDouble / 
1000000000 + " seconds")
    > }
    > 
    > def benchmark(name: String, card: Int)(f: => Unit){
    >   (0 to card).foreach(i => f)
    > }
    > 
    > After modified File SourceScan Exec: 
    > benchmark("File SourceScan Exec", 2){
    > runBenchmark("After modified File SourceScan Exec ", 200) {
    > spark.conf.set("spark.sql.codegen.maxFields", 2)
    > spark.conf.set("spark.sql.parquet.enableVectorizedReader", true)
    > spark.sql("select * from table500000").count()
    > }
    > }
    > 
    > Before modified File SourceScan Exec:
    > benchmark("File SourceScan Exec", 2){
    > runBenchmark("Before modified File SourceScan Exec ", 200) {
    > spark.conf.set("spark.sql.codegen.maxFields", 2)
    > spark.conf.set("spark.sql.parquet.enableVectorizedReader", false)
    > spark.sql("select * from table500000").count()
    > }
    > }
    > 
    
    **test result:**
    
    > 
    > Test 20 times:
    > *Test times:                first times(s)    second times(s)   Third  
times(s)   avg(s)
    > 
*-----------------------------------------------------------------------------------------
    > *Before modified            10.97             10.83             11.05     
        10.95
    > *After modified               9.33              9.61               9.32   
            9.42
    > 
    > 
    > Test 100 times:
    > *Test times:                first times(s)    second times(s)   Third  
times(s)   avg(s)
    > 
*-----------------------------------------------------------------------------------------
    > *Before modified            51.74             52.80             71.88     
        58.80 
    > *After modified               47.24             46.18             48.92   
          47.45
    > 
    > 
    > Test 200 times:
    > *Test times:                first times(s)    second times(s)   Third  
times(s)   avg(s)
    > 
*-----------------------------------------------------------------------------------------
    > *Before modified            236.85            325.97            395.69    
        319.50
    > *After modified              208.90             244.13            261.18  
          238.07 
    > 
    > 
    
    thanks.
    
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to