maropu commented on a change in pull request #29342:
URL: https://github.com/apache/spark/pull/29342#discussion_r466172637
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -305,6 +312,23 @@ private[joins] class UnsafeHashedRelation(
override def read(kryo: Kryo, in: Input): Unit = Utils.tryOrIOException {
read(() => in.readInt(), () => in.readLong(), in.readBytes)
}
+
+ override def values(): Iterator[InternalRow] = {
+ val iter = binaryMap.iterator()
+
+ new Iterator[InternalRow] {
+ override def hasNext: Boolean = iter.hasNext
+
+ override def next(): InternalRow = {
+ if (!hasNext) {
+ throw new NoSuchElementException("End of the iterator")
+ }
+ val loc = iter.next()
+ resultRow.pointTo(loc.getValueBase, loc.getValueOffset,
loc.getValueLength)
+ resultRow
Review comment:
Could you update the comment of `resultRow`?
```
// re-used in get()/getValue()
var resultRow = new UnsafeRow(numFields)
```
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
##########
@@ -260,8 +260,8 @@ trait JoinSelectionHelper {
canBuildLocalHashMapBySize(right, conf) && muchSmaller(right, left)
}
getBuildSide(
- canBuildLeft(joinType) && buildLeft,
- canBuildRight(joinType) && buildRight,
+ canBuildShuffledHashJoinLeft(joinType) && buildLeft,
Review comment:
Is it okay to share the param (`autoBroadcastJoinThreshold`) for the
threshold? Probably, I think we need a new conf for that.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -97,7 +102,9 @@ private[execution] object HashedRelation {
key: Seq[Expression],
sizeEstimate: Int = 64,
taskMemoryManager: TaskMemoryManager = null,
- isNullAware: Boolean = false): HashedRelation = {
+ isNullAware: Boolean = false,
+ isLookupAware: Boolean = false,
+ value: Option[Seq[Expression]] = None): HashedRelation = {
Review comment:
Could you add comments about what the new params mean in `@param`?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledJoin.scala
##########
@@ -40,4 +41,24 @@ trait ShuffledJoin extends BaseJoinExec {
throw new IllegalArgumentException(
s"ShuffledJoin should not take $x as the JoinType")
}
+
+ override def output: Seq[Attribute] = {
+ joinType match {
+ case _: InnerLike =>
+ left.output ++ right.output
+ case LeftOuter =>
+ left.output ++ right.output.map(_.withNullability(true))
+ case RightOuter =>
+ left.output.map(_.withNullability(true)) ++ right.output
+ case FullOuter =>
+ (left.output ++ right.output).map(_.withNullability(true))
+ case j: ExistenceJoin =>
+ left.output :+ j.exists
+ case LeftExistence(_) =>
+ left.output
+ case x =>
+ throw new IllegalArgumentException(
+ s"ShuffledJoin not take $x as the JoinType")
Review comment:
Why did you change it like `{getClass.getSimpleName}` -> `ShuffledJoin`?
I think the previous one is better.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -305,6 +312,23 @@ private[joins] class UnsafeHashedRelation(
override def read(kryo: Kryo, in: Input): Unit = Utils.tryOrIOException {
read(() => in.readInt(), () => in.readLong(), in.readBytes)
}
+
+ override def values(): Iterator[InternalRow] = {
+ val iter = binaryMap.iterator()
+
+ new Iterator[InternalRow] {
+ override def hasNext: Boolean = iter.hasNext
+
+ override def next(): InternalRow = {
+ if (!hasNext) {
+ throw new NoSuchElementException("End of the iterator")
+ }
+ val loc = iter.next()
+ resultRow.pointTo(loc.getValueBase, loc.getValueOffset,
loc.getValueLength)
+ resultRow
Review comment:
Then, could you move this `value()` method close to `getValue()`?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -110,10 +117,10 @@ private[execution] object HashedRelation {
if (isNullAware && !input.hasNext) {
EmptyHashedRelation
- } else if (key.length == 1 && key.head.dataType == LongType) {
+ } else if (key.length == 1 && key.head.dataType == LongType &&
!isLookupAware) {
Review comment:
We cannot support `LongHashedRelation` for the case `!isLookupAware`?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -327,23 +353,48 @@ private[joins] object UnsafeHashedRelation {
// Create a mapping of buildKeys -> rows
val keyGenerator = UnsafeProjection.create(key)
var numFields = 0
- while (input.hasNext) {
- val row = input.next().asInstanceOf[UnsafeRow]
- numFields = row.numFields()
- val key = keyGenerator(row)
- if (!key.anyNull) {
+
+ if (isLookupAware) {
+ // Add one extra boolean value at the end as part of the row,
+ // to track the information that whether the corresponding key
+ // has been looked up or not. See `ShuffledHashJoin.fullOuterJoin` for
example of usage.
+ val valueGenerator = UnsafeProjection.create(value.get :+ Literal(false))
+
+ while (input.hasNext) {
+ val row = input.next().asInstanceOf[UnsafeRow]
+ numFields = row.numFields() + 1
+ val key = keyGenerator(row)
+ val value = valueGenerator(row)
val loc = binaryMap.lookup(key.getBaseObject, key.getBaseOffset,
key.getSizeInBytes)
val success = loc.append(
key.getBaseObject, key.getBaseOffset, key.getSizeInBytes,
- row.getBaseObject, row.getBaseOffset, row.getSizeInBytes)
+ value.getBaseObject, value.getBaseOffset, value.getSizeInBytes)
if (!success) {
binaryMap.free()
// scalastyle:off throwerror
throw new SparkOutOfMemoryError("There is not enough memory to build
hash map")
// scalastyle:on throwerror
}
- } else if (isNullAware) {
- return EmptyHashedRelationWithAllNullKeys
+ }
+ } else {
+ while (input.hasNext) {
+ val row = input.next().asInstanceOf[UnsafeRow]
+ numFields = row.numFields()
+ val key = keyGenerator(row)
+ if (!key.anyNull) {
+ val loc = binaryMap.lookup(key.getBaseObject, key.getBaseOffset,
key.getSizeInBytes)
+ val success = loc.append(
+ key.getBaseObject, key.getBaseOffset, key.getSizeInBytes,
+ row.getBaseObject, row.getBaseOffset, row.getSizeInBytes)
+ if (!success) {
+ binaryMap.free()
+ // scalastyle:off throwerror
+ throw new SparkOutOfMemoryError("There is not enough memory to
build hash map")
+ // scalastyle:on throwerror
+ }
+ } else if (isNullAware) {
+ return EmptyHashedRelationWithAllNullKeys
+ }
Review comment:
nit format: how about reformatting it like this?
```
val append = (key: UnsafeRow, value: UnsafeRow) => {
val loc = binaryMap.lookup(key.getBaseObject, key.getBaseOffset,
key.getSizeInBytes)
val success = loc.append(
key.getBaseObject, key.getBaseOffset, key.getSizeInBytes,
value.getBaseObject, value.getBaseOffset, value.getSizeInBytes)
if (!success) {
binaryMap.free()
// scalastyle:off throwerror
throw new SparkOutOfMemoryError("There is not enough memory to build
hash map")
// scalastyle:on throwerror
}
}
if (isLookupAware) {
// Add one extra boolean value at the end as part of the row,
// to track the information that whether the corresponding key
// has been looked up or not. See `ShuffledHashJoin.fullOuterJoin` for
example of usage.
val valueGenerator = UnsafeProjection.create(value.get :+
Literal(false))
while (input.hasNext) {
val row = input.next().asInstanceOf[UnsafeRow]
numFields = row.numFields() + 1
append(keyGenerator(row), valueGenerator(row))
}
} else {
while (input.hasNext) {
val row = input.next().asInstanceOf[UnsafeRow]
numFields = row.numFields()
val key = keyGenerator(row)
if (!key.anyNull) {
append(key, row)
} else if (isNullAware) {
return EmptyHashedRelationWithAllNullKeys
}
}
}
```
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -58,8 +65,19 @@ case class ShuffledHashJoinExec(
val buildTime = longMetric("buildTime")
val start = System.nanoTime()
val context = TaskContext.get()
+
+ val (isLookupAware, value) =
Review comment:
nit format:
```
val (isLookupAware, value) = if (joinType == FullOuter) {
(true, Some(BindReferences.bindReferences(buildOutput, buildOutput)))
} else {
(false, None)
}
```
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -71,8 +89,134 @@ case class ShuffledHashJoinExec(
val numOutputRows = longMetric("numOutputRows")
streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter,
buildIter) =>
val hashed = buildHashedRelation(buildIter)
- join(streamIter, hashed, numOutputRows)
+ joinType match {
+ case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows)
+ case _ => join(streamIter, hashed, numOutputRows)
+ }
+ }
+ }
+
+ /**
+ * Full outer shuffled hash join has three steps:
+ * 1. Construct hash relation from build side,
+ * with extra boolean value at the end of row to track look up information
+ * (done in `buildHashedRelation`).
+ * 2. Process rows from stream side by looking up hash relation,
+ * and mark the matched rows from build side be looked up.
+ * 3. Process rows from build side by iterating hash relation,
+ * and filter out rows from build side being looked up already.
+ */
+ private def fullOuterJoin(
+ streamIter: Iterator[InternalRow],
+ hashedRelation: HashedRelation,
+ numOutputRows: SQLMetric): Iterator[InternalRow] = {
+ abstract class HashJoinedRow extends JoinedRow {
Review comment:
Could we simply use a closure instead? e.g.,
```
val (joinRowWithStream, joinRowWithBuild) = {
val joinedRow = new JoinedRow()
buildSide match {
case BuildLeft => (joinedRow.withRight _, joinedRow.withLeft _)
case BuildRight => (joinedRow.withLeft _, joinedRow.withRight _)
}
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]