bersprockets commented on a change in pull request #34659:
URL: https://github.com/apache/spark/pull/34659#discussion_r774322647
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
##########
@@ -103,9 +104,14 @@ public void initialize(InputSplit inputSplit,
TaskAttemptContext taskAttemptCont
fileReader.setRequestedSchema(requestedSchema);
String sparkRequestedSchemaString =
configuration.get(ParquetReadSupport$.MODULE$.SPARK_ROW_REQUESTED_SCHEMA());
- this.sparkSchema =
StructType$.MODULE$.fromString(sparkRequestedSchemaString);
+ StructType sparkRequestedSchema =
StructType$.MODULE$.fromString(sparkRequestedSchemaString);
+ ParquetToSparkSchemaConverter converter = new
ParquetToSparkSchemaConverter(configuration);
+ this.parquetColumn = converter.convertParquetColumn(requestedSchema,
Review comment:
>see what's the causing the perf issue in
ParquetToSparkSchemaConverter#convertInternal
It's definitely the n\*n (or n\*(n/2) on average) loop here:
```
val converted = (0 until groupColumn.getChildrenCount).map { i =>
val field = groupColumn.getChild(i)
val fieldFromReadSchema = sparkReadSchema.flatMap { schema =>
schema.find(f => isSameFieldName(f.name, field.getName,
caseSensitive))
}
```
`schema.find` should instead be a map lookup, or something like that.
>it'd be great if you already have some code snippet to share
Sure, in a verbose state (edit: I ran with `--master "local[1]"`):
```
// test for wide rows
val home_candidate = sys.env("HOME")
val home = if (!home_candidate.endsWith("/")) {
s"${home_candidate}/"
} else {
home_candidate
}
val width = 6000
val selectExpr = (1 to width).map { i =>
s"value as c$i"
}
import scala.util.Random
val r = new Random(65657652L)
val loopCount = 25
for (i <- 0 until loopCount) {
// println(s"iteration $i")
val df = spark.range(5000).map(_ =>
r.nextLong).toDF().selectExpr(selectExpr: _*)
val mode = if (i == 0) {
"overwrite"
} else {
"append"
}
df.coalesce(1).write.mode(mode).format("parquet").save(s"${home}data/wide_row_parquet")
}
// read test
val home_candidate = sys.env("HOME")
val home = if (!home_candidate.endsWith("/")) {
s"${home_candidate}/"
} else {
home_candidate
}
spark.read.parquet(s"${home}data/wide_row_parquet").createOrReplaceTempView("df")
val startTime = System.currentTimeMillis
sql("select * from df where (c5*c12) == 12").collect
(System.currentTimeMillis - startTime)/60.0/1000
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]