hvanhovell commented on code in PR #40997:
URL: https://github.com/apache/spark/pull/40997#discussion_r1181852165
##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -837,6 +837,76 @@ class Dataset[T] private[sql] (
}
}
+ /**
+ * Joins this Dataset returning a `Tuple2` for each pair where `condition`
evaluates to true.
+ *
+ * This is similar to the relation `join` function with one important
difference in the result
+ * schema. Since `joinWith` preserves objects present on either side of the
join, the result
+ * schema is similarly nested into a tuple under the column names `_1` and
`_2`.
+ *
+ * This type of join can be useful both for preserving type-safety with the
original object
+ * types as well as working with relational data where either side of the
join has column names
+ * in common.
+ *
+ * @param other
+ * Right side of the join.
+ * @param condition
+ * Join expression.
+ * @param joinType
+ * Type of join to perform. Default `inner`. Must be one of: `inner`,
`cross`, `outer`,
+ * `full`, `fullouter`,`full_outer`, `left`, `leftouter`, `left_outer`,
`right`, `rightouter`,
+ * `right_outer`.
+ *
+ * @group typedrel
+ * @since 3.5.0
+ */
+ def joinWith[U](other: Dataset[U], condition: Column, joinType: String):
Dataset[(T, U)] = {
+ val joinTypeValue = toJoinType(joinType, skipSemiAnti = true)
+ val joinedNullables = joinTypeValue match {
+ case proto.Join.JoinType.JOIN_TYPE_INNER |
proto.Join.JoinType.JOIN_TYPE_CROSS =>
+ Seq(false, false)
+ case proto.Join.JoinType.JOIN_TYPE_FULL_OUTER =>
+ Seq(true, true)
+ case proto.Join.JoinType.JOIN_TYPE_LEFT_OUTER =>
+ Seq(false, true)
+ case proto.Join.JoinType.JOIN_TYPE_RIGHT_OUTER =>
+ Seq(true, false)
+ case e =>
+ throw new IllegalArgumentException(s"Unsupported join type `joinType`:
$e")
+ }
+
+ val tupleEncoder = ProductEncoder
+ .tuple(Seq(this.encoder, other.encoder), Some(joinedNullables))
+ .asInstanceOf[AgnosticEncoder[(T, U)]]
+
+ sparkSession.newDataset(tupleEncoder) { builder =>
+ val joinBuilder = builder.getJoinBuilder
+ joinBuilder
+ .setLeft(plan.getRoot)
+ .setRight(other.plan.getRoot)
+ .setJoinType(joinTypeValue)
+ .setJoinCondition(condition.expr)
+ .setLeftSchema(DataTypeProtoConverter.toConnectProtoType(this.schema))
Review Comment:
Yeah, but passing the encoders only tells the server that a different output
schema is expected, i.e. `struct(left.*), struct(right.*)` instead of `left.*,
right.*`. There is a precedent in SQL called `NATURAL JOIN`. I would probably
add a field to the Join message called `leftAndRightAsStruct` or something in
that vain.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]