Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/5717#discussion_r32327276
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
---
@@ -36,46 +36,91 @@ import org.apache.spark.util.collection.CompactBuffer
case class SortMergeJoin(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
+ joinType: JoinType,
left: SparkPlan,
- right: SparkPlan) extends BinaryNode {
+ right: SparkPlan,
+ condition: Option[Expression] = None) extends BinaryNode {
- override def output: Seq[Attribute] = left.output ++ right.output
+ val (streamed, buffered, streamedKeys, bufferedKeys) = joinType match {
+ case RightOuter => (right, left, rightKeys, leftKeys)
+ case _ => (left, right, leftKeys, rightKeys)
+ }
+
+ override def output: Seq[Attribute] = joinType match {
+ case Inner =>
+ left.output ++ right.output
+ case LeftOuter =>
+ left.output ++ right.output.map(_.withNullability(true))
+ case RightOuter =>
+ left.output.map(_.withNullability(true)) ++ right.output
+ case FullOuter =>
+ left.output.map(_.withNullability(true)) ++
right.output.map(_.withNullability(true))
+ case x =>
+ throw new IllegalStateException(s"SortMergeJoin should not take $x
as the JoinType")
+ }
- override def outputPartitioning: Partitioning = left.outputPartitioning
+ override def outputPartitioning: Partitioning = joinType match {
+ case FullOuter =>
+ // when doing Full Outer join, NULL rows from both sides are not so
partitioned.
+ UnknownPartitioning(streamed.outputPartitioning.numPartitions)
+ case _ => streamed.outputPartitioning
+ }
override def requiredChildDistribution: Seq[Distribution] =
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) ::
Nil
// this is to manually construct an ordering that can be used to compare
keys from both sides
- private val keyOrdering: RowOrdering =
RowOrdering.forSchema(leftKeys.map(_.dataType))
+ private val keyOrdering: RowOrdering =
RowOrdering.forSchema(streamedKeys.map(_.dataType))
- override def outputOrdering: Seq[SortOrder] = requiredOrders(leftKeys)
+ override def outputOrdering: Seq[SortOrder] = joinType match {
+ case FullOuter => Nil // when doing Full Outer join, NULL rows from
both sides are not ordered.
+ case _ => requiredOrders(streamedKeys)
+ }
override def requiredChildOrdering: Seq[Seq[SortOrder]] =
requiredOrders(leftKeys) :: requiredOrders(rightKeys) :: Nil
- @transient protected lazy val leftKeyGenerator = newProjection(leftKeys,
left.output)
- @transient protected lazy val rightKeyGenerator =
newProjection(rightKeys, right.output)
+ @transient protected lazy val streamedKeyGenerator =
newProjection(streamedKeys, streamed.output)
+ @transient protected lazy val bufferedKeyGenerator =
newProjection(bufferedKeys, buffered.output)
+
+ // standard null rows
+ @transient private[this] lazy val streamedNullRow = new
GenericRow(streamed.output.length)
--- End diff --
Can we move `streamedNullRow` and `bufferedNullRow` into the
`zipPartitions` call so that we don't have to make a `transient lazy val`? I
find `transient lazy val` to be a bit confusing and like to avoid it when I can.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]