c21 commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r509658931
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
##########
@@ -99,13 +99,20 @@ class SymmetricHashJoinStateManager(
/**
* Get all the matched values for given join condition, with marking matched.
* This method is designed to mark joined rows properly without exposing
internal index of row.
+ *
+ * @param excludeRowsAlreadyMatched Do not join with rows already matched
previously.
+ * This is used for right side of left semi
join in
+ * [[StreamingSymmetricHashJoinExec]] only.
*/
def getJoinedRows(
key: UnsafeRow,
generateJoinedRow: InternalRow => JoinedRow,
- predicate: JoinedRow => Boolean): Iterator[JoinedRow] = {
+ predicate: JoinedRow => Boolean,
+ excludeRowsAlreadyMatched: Boolean = false): Iterator[JoinedRow] = {
val numValues = keyToNumValues.get(key)
- keyWithIndexToValue.getAll(key, numValues).map { keyIdxToValue =>
+ keyWithIndexToValue.getAll(key, numValues).filterNot { keyIdxToValue =>
Review comment:
FYI I created https://issues.apache.org/jira/browse/SPARK-33211 for this
followup.
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
##########
@@ -291,17 +291,17 @@ object UnsupportedOperationChecker extends Logging {
throwError("Full outer joins with streaming
DataFrames/Datasets are not supported")
}
- case LeftSemi | LeftAnti =>
+ case LeftAnti =>
if (right.isStreaming) {
- throwError("Left semi/anti joins with a streaming
DataFrame/Dataset " +
+ throwError("Left anti joins with a streaming DataFrame/Dataset
" +
"on the right are not supported")
}
// We support streaming left outer joins with static on the right
always, and with
Review comment:
@xuanyuanking - updated.
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -1041,3 +1077,204 @@ class StreamingOuterJoinSuite extends StreamTest with
StateStoreMetricsTest with
)
}
}
+
+class StreamingLeftSemiJoinSuite extends StreamingJoinSuite {
+
+ import testImplicits._
+
+ test("windowed left semi join") {
+ val (leftInput, rightInput, joined) = setupWindowedJoin("left_semi")
+
+ testStream(joined)(
+ MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7),
+ CheckNewAnswer(Row(3, 10, 6), Row(4, 10, 8), Row(5, 10, 10)),
+ // states
+ // left: 1, 2, 3, 4 ,5
+ // right: 3, 4, 5, 6, 7
+ assertNumStateRows(total = 10, updated = 10),
+ MultiAddData(leftInput, 21)(rightInput, 22),
+ // Watermark = 11, should remove rows having window=[0,10].
+ CheckNewAnswer(),
+ // states
+ // left: 21
+ // right: 22
+ //
+ // states evicted
+ // left: 1, 2, 3, 4 ,5 (below watermark)
+ // right: 3, 4, 5, 6, 7 (below watermark)
+ assertNumStateRows(total = 2, updated = 2),
+ AddData(leftInput, 22),
+ CheckNewAnswer(Row(22, 30, 44)),
+ // Unlike inner/outer joins, given left input row matches with right
input row,
+ // we don't buffer the matched left input row to the state store.
+ //
+ // states
+ // left: 21
+ // right: 22
+ assertNumStateRows(total = 2, updated = 0),
+ StopStream,
+ StartStream(),
+
+ AddData(leftInput, 1),
+ // Row not add as 1 < state key watermark = 12.
+ CheckNewAnswer(),
+ // states
+ // left: 21
+ // right: 22
+ assertNumStateRows(total = 2, updated = 0, droppedByWatermark = 1),
+ AddData(rightInput, 5),
+ // Row not add as 5 < state key watermark = 12.
+ CheckNewAnswer(),
+ // states
+ // left: 21
+ // right: 22
+ assertNumStateRows(total = 2, updated = 0, droppedByWatermark = 1)
+ )
+ }
+
+ test("left semi early state exclusion on left") {
+ val (leftInput, rightInput, joined) =
setupWindowedJoinWithLeftCondition("left_semi")
+
+ testStream(joined)(
+ MultiAddData(leftInput, 1, 2, 3)(rightInput, 3, 4, 5),
+ // The left rows with leftValue <= 4 should not generate their semi join
rows and
+ // not get added to the state.
+ CheckNewAnswer(Row(3, 10, 6)),
+ // states
+ // left: 3
+ // right: 3, 4, 5
+ assertNumStateRows(total = 4, updated = 4),
+ // We shouldn't get more semi join rows when the watermark advances.
+ MultiAddData(leftInput, 20)(rightInput, 21),
+ CheckNewAnswer(),
+ // states
+ // left: 20
+ // right: 21
+ //
+ // states evicted
+ // left: 3 (below watermark)
+ // right: 3, 4, 5 (below watermark)
+ assertNumStateRows(total = 2, updated = 2),
+ AddData(rightInput, 20),
+ CheckNewAnswer((20, 30, 40)),
+ // states
+ // left: 20
+ // right: 21, 20
+ assertNumStateRows(total = 3, updated = 1)
+ )
+ }
+
+ test("left semi early state exclusion on right") {
+ val (leftInput, rightInput, joined) =
setupWindowedJoinWithRightCondition("left_semi")
+
+ testStream(joined)(
+ MultiAddData(leftInput, 3, 4, 5)(rightInput, 1, 2, 3),
+ // The right rows with rightValue <= 7 should never be added to the
state.
+ // The right row with rightValue = 9 > 7, hence joined and added to
state.
+ CheckNewAnswer(Row(3, 10, 6)),
+ // states
+ // left: 3, 4, 5
+ // right: 3
+ assertNumStateRows(total = 4, updated = 4),
+ // We shouldn't get more semi join rows when the watermark advances.
+ MultiAddData(leftInput, 20)(rightInput, 21),
+ CheckNewAnswer(),
+ // states
+ // left: 20
+ // right: 21
+ //
+ // states evicted
+ // left: 3, 4, 5 (below watermark)
+ // right: 3 (below watermark)
+ assertNumStateRows(total = 2, updated = 2),
+ AddData(rightInput, 20),
+ CheckNewAnswer((20, 30, 40)),
+ // states
+ // left: 20
+ // right: 21, 20
+ assertNumStateRows(total = 3, updated = 1)
+ )
+ }
+
+ test("left semi join with watermark range condition") {
+ val (leftInput, rightInput, joined) =
setupWindowedJoinWithRangeCondition("left_semi")
+
+ testStream(joined)(
+ AddData(leftInput, (1, 5), (3, 5)),
+ CheckNewAnswer(),
+ // states
+ // left: (1, 5), (3, 5)
+ // right: nothing
+ assertNumStateRows(total = 2, updated = 2),
+ AddData(rightInput, (1, 10), (2, 5)),
+ // Match left row in the state.
+ CheckNewAnswer((1, 5)),
+ // states
+ // left: (1, 5), (3, 5)
+ // right: (1, 10), (2, 5)
+ assertNumStateRows(total = 4, updated = 2),
+ AddData(rightInput, (1, 11)),
+ // No match as left time is too low and left row is already matched.
+ CheckNewAnswer(),
+ // states
+ // left: (1, 5), (3, 5)
+ // right: (1, 10), (2, 5), (1, 11)
+ assertNumStateRows(total = 5, updated = 1),
+ // Increase event time watermark to 20s by adding data with time = 30s
on both inputs.
+ AddData(leftInput, (1, 7), (1, 30)),
+ CheckNewAnswer((1, 7)),
+ // states
+ // left: (1, 5), (3, 5), (1, 30)
+ // right: (1, 10), (2, 5), (1, 11)
+ assertNumStateRows(total = 6, updated = 1),
+ // Watermark = 30 - 10 = 20, no matched row.
+ AddData(rightInput, (0, 30)),
+ CheckNewAnswer(),
+ // states
+ // left: (1, 30)
+ // right: (0, 30)
+ //
+ // states evicted
+ // left: (1, 5), (3, 5) (below watermark = 20)
+ // right: (1, 10), (2, 5), (1, 11) (below watermark = 20)
+ assertNumStateRows(total = 2, updated = 1)
+ )
+ }
+
+ test("self left semi join") {
+ val (inputStream, query) = setupWindowedSelfJoin("left_semi")
+
+ testStream(query)(
+ AddData(inputStream, (1, 1L), (2, 2L), (3, 3L), (4, 4L), (5, 5L)),
+ CheckNewAnswer((2, 2), (4, 4)),
+ // batch 1 - global watermark = 0
+ // states
+ // left: (2, 2L), (4, 4L)
+ // (left rows with value % 2 != 0 is filtered per
[[PushDownLeftSemiAntiJoin]])
+ // right: (2, 2L), (4, 4L)
Review comment:
@HeartSaVioR - updated, I also figured the optimization rule should be
`PushPredicateThroughJoin`, instead of `PushDownLeftSemiAntiJoin `, updated
comment as well.
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -1041,3 +1077,204 @@ class StreamingOuterJoinSuite extends StreamTest with
StateStoreMetricsTest with
)
}
}
+
+class StreamingLeftSemiJoinSuite extends StreamingJoinSuite {
+
+ import testImplicits._
+
+ test("windowed left semi join") {
+ val (leftInput, rightInput, joined) = setupWindowedJoin("left_semi")
+
+ testStream(joined)(
+ MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7),
+ CheckNewAnswer(Row(3, 10, 6), Row(4, 10, 8), Row(5, 10, 10)),
+ // states
+ // left: 1, 2, 3, 4 ,5
+ // right: 3, 4, 5, 6, 7
+ assertNumStateRows(total = 10, updated = 10),
+ MultiAddData(leftInput, 21)(rightInput, 22),
+ // Watermark = 11, should remove rows having window=[0,10].
+ CheckNewAnswer(),
+ // states
+ // left: 21
+ // right: 22
+ //
+ // states evicted
+ // left: 1, 2, 3, 4 ,5 (below watermark)
+ // right: 3, 4, 5, 6, 7 (below watermark)
+ assertNumStateRows(total = 2, updated = 2),
+ AddData(leftInput, 22),
+ CheckNewAnswer(Row(22, 30, 44)),
+ // Unlike inner/outer joins, given left input row matches with right
input row,
+ // we don't buffer the matched left input row to the state store.
+ //
+ // states
+ // left: 21
+ // right: 22
+ assertNumStateRows(total = 2, updated = 0),
+ StopStream,
+ StartStream(),
+
+ AddData(leftInput, 1),
+ // Row not add as 1 < state key watermark = 12.
+ CheckNewAnswer(),
+ // states
+ // left: 21
+ // right: 22
+ assertNumStateRows(total = 2, updated = 0, droppedByWatermark = 1),
+ AddData(rightInput, 5),
+ // Row not add as 5 < state key watermark = 12.
+ CheckNewAnswer(),
+ // states
+ // left: 21
+ // right: 22
+ assertNumStateRows(total = 2, updated = 0, droppedByWatermark = 1)
+ )
+ }
+
+ test("left semi early state exclusion on left") {
+ val (leftInput, rightInput, joined) =
setupWindowedJoinWithLeftCondition("left_semi")
+
+ testStream(joined)(
+ MultiAddData(leftInput, 1, 2, 3)(rightInput, 3, 4, 5),
+ // The left rows with leftValue <= 4 should not generate their semi join
rows and
+ // not get added to the state.
+ CheckNewAnswer(Row(3, 10, 6)),
+ // states
+ // left: 3
+ // right: 3, 4, 5
+ assertNumStateRows(total = 4, updated = 4),
+ // We shouldn't get more semi join rows when the watermark advances.
+ MultiAddData(leftInput, 20)(rightInput, 21),
+ CheckNewAnswer(),
+ // states
+ // left: 20
+ // right: 21
+ //
+ // states evicted
+ // left: 3 (below watermark)
+ // right: 3, 4, 5 (below watermark)
+ assertNumStateRows(total = 2, updated = 2),
+ AddData(rightInput, 20),
+ CheckNewAnswer((20, 30, 40)),
+ // states
+ // left: 20
+ // right: 21, 20
+ assertNumStateRows(total = 3, updated = 1)
+ )
+ }
+
+ test("left semi early state exclusion on right") {
+ val (leftInput, rightInput, joined) =
setupWindowedJoinWithRightCondition("left_semi")
+
+ testStream(joined)(
+ MultiAddData(leftInput, 3, 4, 5)(rightInput, 1, 2, 3),
+ // The right rows with rightValue <= 7 should never be added to the
state.
+ // The right row with rightValue = 9 > 7, hence joined and added to
state.
+ CheckNewAnswer(Row(3, 10, 6)),
+ // states
+ // left: 3, 4, 5
+ // right: 3
+ assertNumStateRows(total = 4, updated = 4),
+ // We shouldn't get more semi join rows when the watermark advances.
+ MultiAddData(leftInput, 20)(rightInput, 21),
+ CheckNewAnswer(),
+ // states
+ // left: 20
+ // right: 21
+ //
+ // states evicted
+ // left: 3, 4, 5 (below watermark)
+ // right: 3 (below watermark)
+ assertNumStateRows(total = 2, updated = 2),
+ AddData(rightInput, 20),
+ CheckNewAnswer((20, 30, 40)),
+ // states
+ // left: 20
+ // right: 21, 20
+ assertNumStateRows(total = 3, updated = 1)
+ )
+ }
+
+ test("left semi join with watermark range condition") {
+ val (leftInput, rightInput, joined) =
setupWindowedJoinWithRangeCondition("left_semi")
+
+ testStream(joined)(
+ AddData(leftInput, (1, 5), (3, 5)),
+ CheckNewAnswer(),
+ // states
+ // left: (1, 5), (3, 5)
+ // right: nothing
+ assertNumStateRows(total = 2, updated = 2),
+ AddData(rightInput, (1, 10), (2, 5)),
+ // Match left row in the state.
+ CheckNewAnswer((1, 5)),
+ // states
+ // left: (1, 5), (3, 5)
+ // right: (1, 10), (2, 5)
+ assertNumStateRows(total = 4, updated = 2),
+ AddData(rightInput, (1, 11)),
+ // No match as left time is too low and left row is already matched.
Review comment:
@HeartSaVioR - sounds good, updated.
##########
File path:
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
##########
@@ -490,15 +490,77 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
_.join(_, joinType = LeftSemi),
streamStreamSupported = false,
batchStreamSupported = false,
- expectedMsg = "left semi/anti joins")
+ expectedMsg = "LeftSemi join")
+
+ // Left semi joins: update and complete mode not allowed
+ assertNotSupportedInStreamingPlan(
+ "left semi join with stream-stream relations and update mode",
+ streamRelation.join(streamRelation, joinType = LeftSemi,
+ condition = Some(attribute === attribute)),
+ OutputMode.Update(),
+ Seq("is not supported in Update output mode"))
+ assertNotSupportedInStreamingPlan(
+ "left semi join with stream-stream relations and complete mode",
+ Aggregate(Nil, aggExprs("d"), streamRelation.join(streamRelation, joinType
= LeftSemi,
+ condition = Some(attribute === attribute))),
+ OutputMode.Complete(),
+ Seq("is not supported in Complete output mode"))
+
+ // Left semi joins: stream-stream allowed with join on watermark attribute
+ // Note that the attribute need not be watermarked on both sides.
+ assertSupportedInStreamingPlan(
+ "left semi join with stream-stream relations and join on attribute with
left watermark",
+ streamRelation.join(streamRelation, joinType = LeftSemi,
+ condition = Some(attributeWithWatermark === attribute)),
+ OutputMode.Append())
+ assertSupportedInStreamingPlan(
+ "left semi join with stream-stream relations and join on attribute with
right watermark",
+ streamRelation.join(streamRelation, joinType = LeftSemi,
+ condition = Some(attribute === attributeWithWatermark)),
+ OutputMode.Append())
+ assertNotSupportedInStreamingPlan(
+ "left semi join with stream-stream relations and join on non-watermarked
attribute",
+ streamRelation.join(streamRelation, joinType = LeftSemi,
+ condition = Some(attribute === attribute)),
+ OutputMode.Append(),
+ Seq("watermark in the join keys"))
Review comment:
@xuanyuanking - yeah I agree adding "without" would be better. I updated
for the left semi join here. A refactoring for all joins (inner, outer, semi,
anti, etc) is anyway needed as a followup JIRA
(https://issues.apache.org/jira/browse/SPARK-33209), so I want to clean up
other places in a separate PR, e.g. "appropriate range condition" has similar
problem.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]