Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10593#discussion_r49636533
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
 ---
    @@ -87,15 +91,62 @@ object ParquetReadBenchmark {
                   if (!record.isNullAt(0)) sum += record.getInt(0)
                 }
                 reader.close()
    -        }}
    +          }
    +        }
    +
    +        // Driving the parquet reader in batch mode directly.
    +        benchmark.addCase("ParquetReader(Batched)") { num =>
    +          var sum = 0L
    +          files.map(_.asInstanceOf[String]).foreach { p =>
    +            val reader = new UnsafeRowParquetRecordReader
    +            try {
    +              reader.initialize(p, ("id" :: Nil).asJava)
    +              val batch = reader.resultBatch()
    +              val col = batch.column(0)
    +              while (reader.nextBatch()) {
    +                val numRows = batch.numRows()
    +                var i = 0
    +                while (i < numRows) {
    +                  if (!col.getIsNull(i)) sum += col.getInt(i)
    +                  i += 1
    +                }
    +              }
    +            } finally {
    +              reader.close()
    +            }
    +          }
    +        }
    +
    +        // Decoding in vectorized but having the reader return rows.
    +        benchmark.addCase("ParquetReader(Batch -> Row)") { num =>
    +          var sum = 0L
    +          files.map(_.asInstanceOf[String]).foreach { p =>
    +            val reader = new UnsafeRowParquetRecordReader
    +            try {
    +              reader.initialize(p, ("id" :: Nil).asJava)
    +              val batch = reader.resultBatch()
    +              while (reader.nextBatch()) {
    +                val it = batch.rowIterator()
    +                while (it.hasNext) {
    +                  val record = it.next()
    +                  if (!record.isNullAt(0)) sum += record.getInt(0)
    +                }
    +              }
    +            } finally {
    +              reader.close()
    +            }
    +          }
    +        }
     
             /*
    -          Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
    -          Single Int Column Scan:      Avg Time(ms)    Avg Rate(M/s)  
Relative Rate
    -          
-------------------------------------------------------------------------
    -          SQL Parquet Reader                 1910.0            13.72       
  1.00 X
    -          SQL Parquet MR                     2330.0            11.25       
  0.82 X
    -          ParquetReader                      1252.6            20.93       
  1.52 X
    +        Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
    +        Single Int Column Scan:            Avg Time(ms)    Avg Rate(M/s)  
Relative Rate
    +        
-------------------------------------------------------------------------------
    +        SQL Parquet Reader                       1682.6            15.58   
      1.00 X
    +        SQL Parquet MR                           2379.6            11.02   
      0.71 X
    +        ParquetReader                            1033.0            25.38   
      1.63 X
    --- End diff --
    
    Should we separate these into two group, one use SQL, another one does not. 
It's not clear to compare SQL parquet Reader than ParquetReader(Batched) 
(should be SQL ParquetReader(Batched))


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to