bersprockets commented on a change in pull request #34659:
URL: https://github.com/apache/spark/pull/34659#discussion_r774322647



##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
##########
@@ -103,9 +104,14 @@ public void initialize(InputSplit inputSplit, 
TaskAttemptContext taskAttemptCont
     fileReader.setRequestedSchema(requestedSchema);
     String sparkRequestedSchemaString =
         
configuration.get(ParquetReadSupport$.MODULE$.SPARK_ROW_REQUESTED_SCHEMA());
-    this.sparkSchema = 
StructType$.MODULE$.fromString(sparkRequestedSchemaString);
+    StructType sparkRequestedSchema = 
StructType$.MODULE$.fromString(sparkRequestedSchemaString);
+    ParquetToSparkSchemaConverter converter = new 
ParquetToSparkSchemaConverter(configuration);
+    this.parquetColumn = converter.convertParquetColumn(requestedSchema,

Review comment:
       >see what's the causing the perf issue in 
ParquetToSparkSchemaConverter#convertInternal
   
   It's definitely the n\*n (or n\*(n/2) on average) loop here:
   ```
       val converted = (0 until groupColumn.getChildrenCount).map { i =>
         val field = groupColumn.getChild(i)
         val fieldFromReadSchema = sparkReadSchema.flatMap { schema =>
           schema.find(f => isSameFieldName(f.name, field.getName, 
caseSensitive))
         }
   ```
   
   `schema.find` should instead be a map lookup, or something like that.
   
   >it'd be great if you already have some code snippet to share
   
   Sure, in a verbose state (edit: I ran with `--master "local[1]"`):
   
   ```
   // test for wide rows
   
   val home_candidate = sys.env("HOME")
   val home = if (!home_candidate.endsWith("/")) {
     s"${home_candidate}/"
   } else {
     home_candidate
   }
   
   val width = 6000
   
   val selectExpr = (1 to width).map { i =>
     s"value as c$i"
   }
   
   
   import scala.util.Random
   val r = new Random(65657652L)
   
   val loopCount = 25
   for (i <- 0 until loopCount) {
     // println(s"iteration $i")
     val df = spark.range(5000).map(_ => 
r.nextLong).toDF().selectExpr(selectExpr: _*)
     val mode = if (i == 0) {
       "overwrite"
     } else {
       "append"
     }
     
df.coalesce(1).write.mode(mode).format("parquet").save(s"${home}data/wide_row_parquet")
   }
   
   
   // read test
   
   val home_candidate = sys.env("HOME")
   val home = if (!home_candidate.endsWith("/")) {
     s"${home_candidate}/"
   } else {
     home_candidate
   }
   
   
spark.read.parquet(s"${home}data/wide_row_parquet").createOrReplaceTempView("df")
   val startTime = System.currentTimeMillis
   sql("select * from df where (c5*c12) == 12").collect
   (System.currentTimeMillis - startTime)/60.0/1000
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to