Github user concretevitamin commented on a diff in the pull request:
https://github.com/apache/spark/pull/1147#discussion_r14043578
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
@@ -36,158 +37,211 @@ case object BuildLeft extends BuildSide
case object BuildRight extends BuildSide
/**
- * :: DeveloperApi ::
+ * Output the tuples for the matched (with the same join key) join groups,
accordingly to join type
*/
-@DeveloperApi
-case class HashJoin(
- leftKeys: Seq[Expression],
- rightKeys: Seq[Expression],
- buildSide: BuildSide,
- left: SparkPlan,
- right: SparkPlan) extends BinaryNode {
+trait BinaryJoinNode extends BinaryNode {
+ self: Product =>
- override def outputPartitioning: Partitioning = left.outputPartitioning
+ val SINGLE_NULL_LIST = Seq[Row](null)
+ val EMPTY_NULL_LIST = Seq[Row]()
- override def requiredChildDistribution =
- ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) ::
Nil
+ val LEFT_NULL_ROW = new GenericRow(left.output.length)
+ val RIGHT_NULL_ROW = new GenericRow(right.output.length)
- val (buildPlan, streamedPlan) = buildSide match {
- case BuildLeft => (left, right)
- case BuildRight => (right, left)
- }
+ val joinedRow = new JoinedRow()
- val (buildKeys, streamedKeys) = buildSide match {
- case BuildLeft => (leftKeys, rightKeys)
- case BuildRight => (rightKeys, leftKeys)
- }
+ val boundCondition = InterpretedPredicate(
+ condition
+ .map(c => BindReferences.bindReference(c, left.output ++
right.output))
+ .getOrElse(Literal(true)))
- def output = left.output ++ right.output
+ def condition: Option[Expression]
+ def joinType: JoinType
- @transient lazy val buildSideKeyGenerator = new Projection(buildKeys,
buildPlan.output)
- @transient lazy val streamSideKeyGenerator =
- () => new MutableProjection(streamedKeys, streamedPlan.output)
+ @inline
+ protected[this] def predicateWithSettingLeft(joinRow: JoinedRow, part:
Row) = {
+ joinedRow.setLeftRow(part)
- def execute() = {
+ boundCondition(joinedRow)
+ }
+
+ @inline
+ protected[this] def predicateWithSettingRight(joinRow: JoinedRow, part:
Row) = {
+ joinedRow.setRightRow(part)
- buildPlan.execute().zipPartitions(streamedPlan.execute()) {
(buildIter, streamIter) =>
- // TODO: Use Spark's HashMap implementation.
- val hashTable = new java.util.HashMap[Row, ArrayBuffer[Row]]()
- var currentRow: Row = null
-
- // Create a mapping of buildKeys -> rows
- while (buildIter.hasNext) {
- currentRow = buildIter.next()
- val rowKey = buildSideKeyGenerator(currentRow)
- if(!rowKey.anyNull) {
- val existingMatchList = hashTable.get(rowKey)
- val matchList = if (existingMatchList == null) {
- val newMatchList = new ArrayBuffer[Row]()
- hashTable.put(rowKey, newMatchList)
- newMatchList
- } else {
- existingMatchList
- }
- matchList += currentRow.copy()
+ boundCondition(joinedRow)
+ }
+
+ def leftOuterIterator(key: Row, leftIter: Iterable[Row], rightIter:
Iterable[Row])
+ : Iterator[Row] = {
+ leftIter.iterator.flatMap { l =>
+ joinedRow.setLeftRow(l)
+ var matched = false
+ (if(!key.anyNull) rightIter else EMPTY_NULL_LIST).collect {
+ case r if (predicateWithSettingRight(joinedRow, r)) => {
+ matched = true
+ joinedRow.copy
+ }
+ } ++ SINGLE_NULL_LIST.collect {
+ case dummy if (!matched) => {
+ joinedRow.setRightRow(RIGHT_NULL_ROW)
+ joinedRow.copy
}
}
+ }
+ }
- new Iterator[Row] {
- private[this] var currentStreamedRow: Row = _
- private[this] var currentHashMatches: ArrayBuffer[Row] = _
- private[this] var currentMatchPosition: Int = -1
-
- // Mutable per row objects.
- private[this] val joinRow = new JoinedRow
-
- private[this] val joinKeys = streamSideKeyGenerator()
-
- override final def hasNext: Boolean =
- (currentMatchPosition != -1 && currentMatchPosition <
currentHashMatches.size) ||
- (streamIter.hasNext && fetchNext())
+ def leftSemiIterator(key: Row, leftIter: Iterable[Row], rightIter:
Iterable[Row])
+ : Iterator[Row] = {
+ leftIter.iterator.filter { l =>
+ joinedRow.setLeftRow(l)
+ (if(!key.anyNull) rightIter else EMPTY_NULL_LIST).exists {
+ case r => (predicateWithSettingRight(joinedRow, r))
+ }
+ }
+ }
- override final def next() = {
- val ret = joinRow(currentStreamedRow,
currentHashMatches(currentMatchPosition))
- currentMatchPosition += 1
- ret
+ def rightOuterIterator(key: Row, leftIter: Iterable[Row], rightIter:
Iterable[Row])
+ : Iterator[Row] = {
+ rightIter.iterator.flatMap{r =>
+ joinedRow.setRightRow(r)
+ var matched = false
+ (if(!key.anyNull) leftIter else EMPTY_NULL_LIST).collect {
+ case r if (predicateWithSettingLeft(joinedRow, r)) => {
+ matched = true
+ joinedRow.copy
+ }
+ } ++ SINGLE_NULL_LIST.collect {
+ case dummy if(!matched) => {
+ joinedRow.setLeftRow(LEFT_NULL_ROW)
+ joinedRow.copy
}
+ }
+ }
+ }
- /**
- * Searches the streamed iterator for the next row that has at
least one match in hashtable.
- *
- * @return true if the search is successful, and false the
streamed iterator runs out of
- * tuples.
- */
- private final def fetchNext(): Boolean = {
- currentHashMatches = null
- currentMatchPosition = -1
-
- while (currentHashMatches == null && streamIter.hasNext) {
- currentStreamedRow = streamIter.next()
- if (!joinKeys(currentStreamedRow).anyNull) {
- currentHashMatches = hashTable.get(joinKeys.currentValue)
+ def fullOuterIterator(key: Row, leftIter: Iterable[Row], rightIter:
Iterable[Row])
+ : Iterator[Row] = {
+ if(!key.anyNull) {
+ val rightMatchedSet = scala.collection.mutable.Set[Int]()
+ leftIter.iterator.flatMap[Row] { l =>
+ joinedRow.setLeftRow(l)
+ var matched = false
+ rightIter.zipWithIndex.collect {
+ case (r, idx) if (predicateWithSettingRight(joinedRow, r))=> {
+ matched = true
+ rightMatchedSet.add(idx)
+ joinedRow.copy
}
- }
-
- if (currentHashMatches == null) {
- false
- } else {
- currentMatchPosition = 0
- true
- }
+ } ++ SINGLE_NULL_LIST.collect {
--- End diff --
For better readability, let's use vals to hold intermediate results ;)
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---