cloud-fan commented on code in PR #39673:
URL: https://github.com/apache/spark/pull/39673#discussion_r1177978929
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala:
##########
@@ -659,22 +659,46 @@ object CoGroup {
right: LogicalPlan): LogicalPlan = {
require(StructType.fromAttributes(leftGroup) ==
StructType.fromAttributes(rightGroup))
+ // SPARK-42132: The DeduplicateRelations rule would replace duplicate
attributes
+ // in the right plan and rewrite rightGroup and rightAttr. But it would
also rewrite
+ // leftGroup and leftAttr, which is wrong. Additionally, it does not
rewrite rightDeserializer.
+ // Aliasing duplicate attributes in the right plan deduplicates them and
stops
+ // DeduplicateRelations to do harm.
+ val duplicateAttributes = AttributeMap(
+ right.output.filter(left.output.contains).map(a => a -> Alias(a,
a.name)())
+ )
+
+ def dedup(attrs: Seq[Attribute]): Seq[NamedExpression] =
+ attrs.map(attr => duplicateAttributes.getOrElse(attr, attr))
+
+ // rightOrder is resolved against right plan, so deduplication not needed
+ val (dedupRightGroup, dedupRightAttr, dedupRight) =
+ if (duplicateAttributes.nonEmpty) {
Review Comment:
it's a bit weird to do dedup here. Can we update the `DeduplicateRelations`
rule to handle `CoGroup` specially?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]