Github user yucai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18121#discussion_r195288117
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala 
---
    @@ -58,6 +59,24 @@ case class ReusedExchangeExec(override val output: 
Seq[Attribute], child: Exchan
       override protected[sql] def doExecuteBroadcast[T](): 
broadcast.Broadcast[T] = {
         child.executeBroadcast()
       }
    +
    +  // `ReusedExchangeExec` can have distinct set of output attribute ids 
from its child, we need
    +  // to update the attribute ids in `outputPartitioning` and 
`outputOrdering`.
    +  private lazy val updateAttr: Expression => Expression = {
    +    val originalAttrToNewAttr = AttributeMap(child.output.zip(output))
    +    e => e.transform {
    +      case attr: Attribute => originalAttrToNewAttr.getOrElse(attr, attr)
    +    }
    +  }
    +
    +  override def outputPartitioning: Partitioning = child.outputPartitioning 
match {
    +    case h: HashPartitioning => h.copy(expressions = 
h.expressions.map(updateAttr))
    +    case other => other
    +  }
    +
    --- End diff --
    
    @cloud-fan @viirya Could you help explain why we only consider 
`HashPartitioning` here?
    How about `RangePartitioning`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to