cloud-fan commented on a change in pull request #32301:
URL: https://github.com/apache/spark/pull/32301#discussion_r638943696



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
##########
@@ -227,137 +297,114 @@ object NestedColumnAliasing {
 }
 
 /**
- * This prunes unnecessary nested columns from `Generate` and optional 
`Project` on top
- * of it.
+ * This prunes unnecessary nested columns from [[Generate]], or [[Project]] -> 
[[Generate]]
  */
 object GeneratorNestedColumnAliasing {
-  // Partitions `attrToAliases` based on whether the attribute is in 
Generator's output.
-  private def aliasesOnGeneratorOutput(
-      attrToAliases: Map[ExprId, Seq[Alias]],
-      generatorOutput: Seq[Attribute]) = {
-    val generatorOutputExprId = generatorOutput.map(_.exprId)
-    attrToAliases.partition { k =>
-      generatorOutputExprId.contains(k._1)
-    }
-  }
-
-  // Partitions `nestedFieldToAlias` based on whether the attribute of nested 
field extractor
-  // is in Generator's output.
-  private def nestedFieldOnGeneratorOutput(
-      nestedFieldToAlias: Map[ExtractValue, Alias],
-      generatorOutput: Seq[Attribute]) = {
-    val generatorOutputSet = AttributeSet(generatorOutput)
-    nestedFieldToAlias.partition { pair =>
-      pair._1.references.subsetOf(generatorOutputSet)
-    }
-  }
-
   def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match {
     // Either `nestedPruningOnExpressions` or `nestedSchemaPruningEnabled` is 
enabled, we
     // need to prune nested columns through Project and under Generate. The 
difference is
     // when `nestedSchemaPruningEnabled` is on, nested columns will be pruned 
further at
     // file format readers if it is supported.
     case Project(projectList, g: Generate) if 
(SQLConf.get.nestedPruningOnExpressions ||
-        SQLConf.get.nestedSchemaPruningEnabled) && 
canPruneGenerator(g.generator) =>
+      SQLConf.get.nestedSchemaPruningEnabled) && 
canPruneGenerator(g.generator) =>
       // On top on `Generate`, a `Project` that might have nested column 
accessors.
       // We try to get alias maps for both project list and generator's 
children expressions.
-      val exprsToPrune = projectList ++ g.generator.children
-      NestedColumnAliasing.getAliasSubMap(exprsToPrune).map {
-        case (nestedFieldToAlias, attrToAliases) =>
-          val (nestedFieldsOnGenerator, nestedFieldsNotOnGenerator) =
-            nestedFieldOnGeneratorOutput(nestedFieldToAlias, 
g.qualifiedGeneratorOutput)
-          val (attrToAliasesOnGenerator, attrToAliasesNotOnGenerator) =
-            aliasesOnGeneratorOutput(attrToAliases, g.qualifiedGeneratorOutput)
-
-          // Push nested column accessors through `Generator`.
-          // Defer updating `Generate.unrequiredChildIndex` to next round of 
`ColumnPruning`.
-          val newChild = NestedColumnAliasing.replaceWithAliases(g,
-            nestedFieldsNotOnGenerator, attrToAliasesNotOnGenerator)
-          val pushedThrough = Project(NestedColumnAliasing
-            .getNewProjectList(projectList, nestedFieldsNotOnGenerator), 
newChild)
-
-          // If the generator output is `ArrayType`, we cannot push through 
the extractor.
-          // It is because we don't allow field extractor on two-level array,
-          // i.e., attr.field when attr is a ArrayType(ArrayType(...)).
-          // Similarily, we also cannot push through if the child of generator 
is `MapType`.
-          g.generator.children.head.dataType match {
-            case _: MapType => return Some(pushedThrough)
-            case ArrayType(_: ArrayType, _) => return Some(pushedThrough)
-            case _ =>
-          }
-
-          // Pruning on `Generator`'s output. We only process single field 
case.
-          // For multiple field case, we cannot directly move field extractor 
into
-          // the generator expression. A workaround is to re-construct array 
of struct
-          // from multiple fields. But it will be more complicated and may not 
worth.
-          // TODO(SPARK-34956): support multiple fields.
-          if (nestedFieldsOnGenerator.size > 1 || 
nestedFieldsOnGenerator.isEmpty) {
-            pushedThrough
-          } else {
-            // Only one nested column accessor.
-            // E.g., df.select(explode($"items").as("item")).select($"item.a")
-            pushedThrough match {
-              case p @ Project(_, newG: Generate) =>
-                // Replace the child expression of `ExplodeBase` generator with
-                // nested column accessor.
-                // E.g., 
df.select(explode($"items").as("item")).select($"item.a") =>
-                //       df.select(explode($"items.a").as("item.a"))
-                val rewrittenG = newG.transformExpressions {
-                  case e: ExplodeBase =>
-                    val extractor = 
nestedFieldsOnGenerator.head._1.transformUp {
-                      case _: Attribute =>
-                        e.child
-                      case g: GetStructField =>
-                        ExtractValue(g.child, Literal(g.extractFieldName), 
SQLConf.get.resolver)
-                    }
-                    e.withNewChildren(Seq(extractor))
-                }
+      val attrToExtractValues = 
NestedColumnAliasing.getAttributeToExtractValues(
+        projectList ++ g.generator.children, Seq.empty)
+      if (attrToExtractValues.isEmpty) {
+        return None
+      }
+      val generatorOutputSet = AttributeSet(g.qualifiedGeneratorOutput)
+      val (attrToExtractValuesOnGenerator, attrToExtractValuesNotOnGenerator) =
+        attrToExtractValues.partition { case (attr, _) =>
+          attr.references.subsetOf(generatorOutputSet) }
+
+      val pushedThrough = NestedColumnAliasing.rewritePlanWithAliases(
+        plan, attrToExtractValuesNotOnGenerator)
+
+      // If the generator output is `ArrayType`, we cannot push through the 
extractor.
+      // It is because we don't allow field extractor on two-level array,
+      // i.e., attr.field when attr is a ArrayType(ArrayType(...)).
+      // Similarily, we also cannot push through if the child of generator is 
`MapType`.
+      g.generator.children.head.dataType match {
+        case _: MapType => return Some(pushedThrough)
+        case ArrayType(_: ArrayType, _) => return Some(pushedThrough)
+        case _ =>
+      }
 
-                // As we change the child of the generator, its output data 
type must be updated.
-                val updatedGeneratorOutput = rewrittenG.generatorOutput
-                  .zip(rewrittenG.generator.elementSchema.toAttributes)
-                  .map { case (oldAttr, newAttr) =>
-                  newAttr.withExprId(oldAttr.exprId).withName(oldAttr.name)
-                }
-                assert(updatedGeneratorOutput.length == 
rewrittenG.generatorOutput.length,
-                  "Updated generator output must have the same length " +
-                    "with original generator output.")
-                val updatedGenerate = rewrittenG.copy(generatorOutput = 
updatedGeneratorOutput)
-
-                // Replace nested column accessor with generator output.
-                p.withNewChildren(Seq(updatedGenerate)).transformExpressions {
-                  case f: ExtractValue if nestedFieldsOnGenerator.contains(f) 
=>
-                    updatedGenerate.output
-                      .find(a => attrToAliasesOnGenerator.contains(a.exprId))
-                      .getOrElse(f)
+      // Pruning on `Generator`'s output. We only process single field case.
+      // For multiple field case, we cannot directly move field extractor into
+      // the generator expression. A workaround is to re-construct array of 
struct
+      // from multiple fields. But it will be more complicated and may not 
worth.
+      // TODO(SPARK-34956): support multiple fields.
+      val nestedFieldsOnGenerator = 
attrToExtractValuesOnGenerator.values.flatten.toSet
+      if (nestedFieldsOnGenerator.size > 1 || nestedFieldsOnGenerator.isEmpty) 
{
+        Some(pushedThrough)
+      } else {
+        // Only one nested column accessor.

Review comment:
       I didn't carefully review the code in the else branch. I assume it's the 
same as the previous code?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to