cloud-fan commented on code in PR #52557:
URL: https://github.com/apache/spark/pull/52557#discussion_r2441534000
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala:
##########
@@ -472,6 +491,64 @@ object ParquetReadSupport extends Logging {
matchCaseInsensitiveField(f)
}
}
+ // Ignore MessageType, because it is the root of the schema, not a struct.
+ if (returnNullStructIfAllFieldsMissing || !isStructWithMissingFields ||
+ parquetRecord.isInstanceOf[MessageType]) {
+ clippedType
+ } else {
+ // Read one arbitrary field to understand when the struct value is null
or not null.
+ clippedType :+ findCheapestGroupField(parquetRecord)
+ }
+ }
+
+ /**
+ * Finds the leaf node under a given file schema node that is likely to be
cheapest to fetch.
+ * Keeps this leaf node inside the same parent hierarchy. This is used when
all struct fields in
+ * the requested schema are missing. Uses a very simple heuristic based on
the parquet type.
+ */
+ private def findCheapestGroupField(parentGroupType: GroupType): Type = {
+ def findCheapestGroupFieldRecurse(curType: Type, repLevel: Int = 0):
(Type, Int, Int) = {
+ curType match {
+ case groupType: GroupType =>
+ var (bestType, bestRepLevel, bestCost) = (Option.empty[Type], 0, 0)
+ for (field <- groupType.getFields.asScala) {
+ val newRepLevel = repLevel + (if
(field.isRepetition(Repetition.REPEATED)) 1 else 0)
+ // Never take a field at a deeper repetition level, since it's
likely to have more data.
+ // Don't do safety checks because we should already have done them
when traversing the
+ // schema for the first time.
+ if (bestType.isEmpty || newRepLevel <= bestRepLevel) {
+ val (childType, childRepLevel, childCost) =
+ findCheapestGroupFieldRecurse(field, newRepLevel)
+ // Always prefer elements with a lower repetition level, since
more nesting of arrays
+ // is likely to result in more data. At the same repetition
level, prefer the smaller
+ // type.
+ if (bestType.isEmpty || childRepLevel < bestRepLevel ||
+ (childRepLevel == bestRepLevel && childCost < bestCost)) {
+ // This is the new best path.
+ bestType = Some(childType)
+ bestRepLevel = childRepLevel
+ bestCost = childCost
+ }
+ }
+ }
+ (groupType.withNewFields(bestType.get), bestRepLevel, bestCost)
+ case primitiveType: PrimitiveType =>
+ val cost = primitiveType.getPrimitiveTypeName match {
+ case PrimitiveType.PrimitiveTypeName.BOOLEAN => 1
+ case PrimitiveType.PrimitiveTypeName.INT32 => 4
+ case PrimitiveType.PrimitiveTypeName.INT64 => 8
+ case PrimitiveType.PrimitiveTypeName.FLOAT => 4
+ case PrimitiveType.PrimitiveTypeName.DOUBLE => 8
+ // Strings seem undesirable, since they don't have a fixed size.
Give them a high cost.
+ case PrimitiveType.PrimitiveTypeName.BINARY |
+ PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY => 32
+ case PrimitiveType.PrimitiveTypeName.INT96 => 12
Review Comment:
shall we be defensive and add a default case with high cost if there is a
new primitive type in the future?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]