Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/5717#discussion_r32327111
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
---
@@ -36,46 +36,91 @@ import org.apache.spark.util.collection.CompactBuffer
case class SortMergeJoin(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
+ joinType: JoinType,
left: SparkPlan,
- right: SparkPlan) extends BinaryNode {
+ right: SparkPlan,
+ condition: Option[Expression] = None) extends BinaryNode {
- override def output: Seq[Attribute] = left.output ++ right.output
+ val (streamed, buffered, streamedKeys, bufferedKeys) = joinType match {
+ case RightOuter => (right, left, rightKeys, leftKeys)
+ case _ => (left, right, leftKeys, rightKeys)
+ }
+
+ override def output: Seq[Attribute] = joinType match {
+ case Inner =>
+ left.output ++ right.output
+ case LeftOuter =>
+ left.output ++ right.output.map(_.withNullability(true))
+ case RightOuter =>
+ left.output.map(_.withNullability(true)) ++ right.output
+ case FullOuter =>
+ left.output.map(_.withNullability(true)) ++
right.output.map(_.withNullability(true))
+ case x =>
+ throw new IllegalStateException(s"SortMergeJoin should not take $x
as the JoinType")
+ }
- override def outputPartitioning: Partitioning = left.outputPartitioning
+ override def outputPartitioning: Partitioning = joinType match {
+ case FullOuter =>
+ // when doing Full Outer join, NULL rows from both sides are not so
partitioned.
+ UnknownPartitioning(streamed.outputPartitioning.numPartitions)
+ case _ => streamed.outputPartitioning
+ }
override def requiredChildDistribution: Seq[Distribution] =
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) ::
Nil
// this is to manually construct an ordering that can be used to compare
keys from both sides
- private val keyOrdering: RowOrdering =
RowOrdering.forSchema(leftKeys.map(_.dataType))
+ private val keyOrdering: RowOrdering =
RowOrdering.forSchema(streamedKeys.map(_.dataType))
- override def outputOrdering: Seq[SortOrder] = requiredOrders(leftKeys)
+ override def outputOrdering: Seq[SortOrder] = joinType match {
+ case FullOuter => Nil // when doing Full Outer join, NULL rows from
both sides are not ordered.
+ case _ => requiredOrders(streamedKeys)
+ }
override def requiredChildOrdering: Seq[Seq[SortOrder]] =
requiredOrders(leftKeys) :: requiredOrders(rightKeys) :: Nil
- @transient protected lazy val leftKeyGenerator = newProjection(leftKeys,
left.output)
- @transient protected lazy val rightKeyGenerator =
newProjection(rightKeys, right.output)
+ @transient protected lazy val streamedKeyGenerator =
newProjection(streamedKeys, streamed.output)
+ @transient protected lazy val bufferedKeyGenerator =
newProjection(bufferedKeys, buffered.output)
+
+ // standard null rows
+ @transient private[this] lazy val streamedNullRow = new
GenericRow(streamed.output.length)
+ @transient private[this] lazy val bufferedNullRow = new
GenericRow(buffered.output.length)
+
+ // checks if the joinedRow can meet condition requirements
+ @transient private[this] lazy val boundCondition =
+ condition.map(newPredicate(_, streamed.output ++
buffered.output)).getOrElse((row: Row) => true)
private def requiredOrders(keys: Seq[Expression]): Seq[SortOrder] =
keys.map(SortOrder(_, Ascending))
protected override def doExecute(): RDD[Row] = {
- val leftResults = left.execute().map(_.copy())
- val rightResults = right.execute().map(_.copy())
+ val streamResults = streamed.execute().map(_.copy())
--- End diff --
Why do we need to copy the streamed rows? I understand why we need to do
the copy for the buffered results, since we might be dealing with mutable input
rows, but that shouldn't be a problem for the stream side, right?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]