cloud-fan commented on a change in pull request #32301:
URL: https://github.com/apache/spark/pull/32301#discussion_r638943696
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
##########
@@ -227,137 +297,114 @@ object NestedColumnAliasing {
}
/**
- * This prunes unnecessary nested columns from `Generate` and optional
`Project` on top
- * of it.
+ * This prunes unnecessary nested columns from [[Generate]], or [[Project]] ->
[[Generate]]
*/
object GeneratorNestedColumnAliasing {
- // Partitions `attrToAliases` based on whether the attribute is in
Generator's output.
- private def aliasesOnGeneratorOutput(
- attrToAliases: Map[ExprId, Seq[Alias]],
- generatorOutput: Seq[Attribute]) = {
- val generatorOutputExprId = generatorOutput.map(_.exprId)
- attrToAliases.partition { k =>
- generatorOutputExprId.contains(k._1)
- }
- }
-
- // Partitions `nestedFieldToAlias` based on whether the attribute of nested
field extractor
- // is in Generator's output.
- private def nestedFieldOnGeneratorOutput(
- nestedFieldToAlias: Map[ExtractValue, Alias],
- generatorOutput: Seq[Attribute]) = {
- val generatorOutputSet = AttributeSet(generatorOutput)
- nestedFieldToAlias.partition { pair =>
- pair._1.references.subsetOf(generatorOutputSet)
- }
- }
-
def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match {
// Either `nestedPruningOnExpressions` or `nestedSchemaPruningEnabled` is
enabled, we
// need to prune nested columns through Project and under Generate. The
difference is
// when `nestedSchemaPruningEnabled` is on, nested columns will be pruned
further at
// file format readers if it is supported.
case Project(projectList, g: Generate) if
(SQLConf.get.nestedPruningOnExpressions ||
- SQLConf.get.nestedSchemaPruningEnabled) &&
canPruneGenerator(g.generator) =>
+ SQLConf.get.nestedSchemaPruningEnabled) &&
canPruneGenerator(g.generator) =>
// On top on `Generate`, a `Project` that might have nested column
accessors.
// We try to get alias maps for both project list and generator's
children expressions.
- val exprsToPrune = projectList ++ g.generator.children
- NestedColumnAliasing.getAliasSubMap(exprsToPrune).map {
- case (nestedFieldToAlias, attrToAliases) =>
- val (nestedFieldsOnGenerator, nestedFieldsNotOnGenerator) =
- nestedFieldOnGeneratorOutput(nestedFieldToAlias,
g.qualifiedGeneratorOutput)
- val (attrToAliasesOnGenerator, attrToAliasesNotOnGenerator) =
- aliasesOnGeneratorOutput(attrToAliases, g.qualifiedGeneratorOutput)
-
- // Push nested column accessors through `Generator`.
- // Defer updating `Generate.unrequiredChildIndex` to next round of
`ColumnPruning`.
- val newChild = NestedColumnAliasing.replaceWithAliases(g,
- nestedFieldsNotOnGenerator, attrToAliasesNotOnGenerator)
- val pushedThrough = Project(NestedColumnAliasing
- .getNewProjectList(projectList, nestedFieldsNotOnGenerator),
newChild)
-
- // If the generator output is `ArrayType`, we cannot push through
the extractor.
- // It is because we don't allow field extractor on two-level array,
- // i.e., attr.field when attr is a ArrayType(ArrayType(...)).
- // Similarily, we also cannot push through if the child of generator
is `MapType`.
- g.generator.children.head.dataType match {
- case _: MapType => return Some(pushedThrough)
- case ArrayType(_: ArrayType, _) => return Some(pushedThrough)
- case _ =>
- }
-
- // Pruning on `Generator`'s output. We only process single field
case.
- // For multiple field case, we cannot directly move field extractor
into
- // the generator expression. A workaround is to re-construct array
of struct
- // from multiple fields. But it will be more complicated and may not
worth.
- // TODO(SPARK-34956): support multiple fields.
- if (nestedFieldsOnGenerator.size > 1 ||
nestedFieldsOnGenerator.isEmpty) {
- pushedThrough
- } else {
- // Only one nested column accessor.
- // E.g., df.select(explode($"items").as("item")).select($"item.a")
- pushedThrough match {
- case p @ Project(_, newG: Generate) =>
- // Replace the child expression of `ExplodeBase` generator with
- // nested column accessor.
- // E.g.,
df.select(explode($"items").as("item")).select($"item.a") =>
- // df.select(explode($"items.a").as("item.a"))
- val rewrittenG = newG.transformExpressions {
- case e: ExplodeBase =>
- val extractor =
nestedFieldsOnGenerator.head._1.transformUp {
- case _: Attribute =>
- e.child
- case g: GetStructField =>
- ExtractValue(g.child, Literal(g.extractFieldName),
SQLConf.get.resolver)
- }
- e.withNewChildren(Seq(extractor))
- }
+ val attrToExtractValues =
NestedColumnAliasing.getAttributeToExtractValues(
+ projectList ++ g.generator.children, Seq.empty)
+ if (attrToExtractValues.isEmpty) {
+ return None
+ }
+ val generatorOutputSet = AttributeSet(g.qualifiedGeneratorOutput)
+ val (attrToExtractValuesOnGenerator, attrToExtractValuesNotOnGenerator) =
+ attrToExtractValues.partition { case (attr, _) =>
+ attr.references.subsetOf(generatorOutputSet) }
+
+ val pushedThrough = NestedColumnAliasing.rewritePlanWithAliases(
+ plan, attrToExtractValuesNotOnGenerator)
+
+ // If the generator output is `ArrayType`, we cannot push through the
extractor.
+ // It is because we don't allow field extractor on two-level array,
+ // i.e., attr.field when attr is a ArrayType(ArrayType(...)).
+ // Similarily, we also cannot push through if the child of generator is
`MapType`.
+ g.generator.children.head.dataType match {
+ case _: MapType => return Some(pushedThrough)
+ case ArrayType(_: ArrayType, _) => return Some(pushedThrough)
+ case _ =>
+ }
- // As we change the child of the generator, its output data
type must be updated.
- val updatedGeneratorOutput = rewrittenG.generatorOutput
- .zip(rewrittenG.generator.elementSchema.toAttributes)
- .map { case (oldAttr, newAttr) =>
- newAttr.withExprId(oldAttr.exprId).withName(oldAttr.name)
- }
- assert(updatedGeneratorOutput.length ==
rewrittenG.generatorOutput.length,
- "Updated generator output must have the same length " +
- "with original generator output.")
- val updatedGenerate = rewrittenG.copy(generatorOutput =
updatedGeneratorOutput)
-
- // Replace nested column accessor with generator output.
- p.withNewChildren(Seq(updatedGenerate)).transformExpressions {
- case f: ExtractValue if nestedFieldsOnGenerator.contains(f)
=>
- updatedGenerate.output
- .find(a => attrToAliasesOnGenerator.contains(a.exprId))
- .getOrElse(f)
+ // Pruning on `Generator`'s output. We only process single field case.
+ // For multiple field case, we cannot directly move field extractor into
+ // the generator expression. A workaround is to re-construct array of
struct
+ // from multiple fields. But it will be more complicated and may not
worth.
+ // TODO(SPARK-34956): support multiple fields.
+ val nestedFieldsOnGenerator =
attrToExtractValuesOnGenerator.values.flatten.toSet
+ if (nestedFieldsOnGenerator.size > 1 || nestedFieldsOnGenerator.isEmpty)
{
+ Some(pushedThrough)
+ } else {
+ // Only one nested column accessor.
Review comment:
I didn't carefully review the code in the else branch. I assume it's the
same as the previous code?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]