HeartSaVioR commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r508269154
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -41,18 +41,174 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
+abstract class StreamingJoinSuite
+ extends StreamTest with StateStoreMetricsTest with BeforeAndAfter {
-class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest
with BeforeAndAfter {
+ import testImplicits._
before {
- SparkSession.setActiveSessionInternal(spark) // set this before force
initializing 'joinExec'
- spark.streams.stateStoreCoordinator // initialize the lazy coordinator
+ SparkSession.setActiveSessionInternal(spark) // set this before force
initializing 'joinExec'
+ spark.streams.stateStoreCoordinator // initialize the lazy coordinator
}
after {
StateStore.stop()
}
+ protected def setupStream(prefix: String, multiplier: Int):
(MemoryStream[Int], DataFrame) = {
+ val input = MemoryStream[Int]
+ val df = input.toDF
+ .select(
+ 'value as "key",
+ timestamp_seconds($"value") as s"${prefix}Time",
+ ('value * multiplier) as s"${prefix}Value")
+ .withWatermark(s"${prefix}Time", "10 seconds")
+
+ (input, df)
+ }
+
+ protected def setupWindowedJoin(joinType: String)
+ : (MemoryStream[Int], MemoryStream[Int], DataFrame) = {
+
+ val (input1, df1) = setupStream("left", 2)
+ val (input2, df2) = setupStream("right", 3)
+ val windowed1 = df1.select('key, window('leftTime, "10 second"),
'leftValue)
+ val windowed2 = df2.select('key, window('rightTime, "10 second"),
'rightValue)
+ val joined = windowed1.join(windowed2, Seq("key", "window"), joinType)
+ val select = if (joinType == "left_semi") {
Review comment:
For reviewers: this is equivalent to
`StreamingOuterJoinSuite.setupWindowedJoin` with changing
1) signature `private` to `protected`
2) conditional select on left_semi vs others, as in left_semi only left side
of columns are available
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -41,18 +41,174 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
+abstract class StreamingJoinSuite
+ extends StreamTest with StateStoreMetricsTest with BeforeAndAfter {
-class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest
with BeforeAndAfter {
+ import testImplicits._
before {
- SparkSession.setActiveSessionInternal(spark) // set this before force
initializing 'joinExec'
- spark.streams.stateStoreCoordinator // initialize the lazy coordinator
+ SparkSession.setActiveSessionInternal(spark) // set this before force
initializing 'joinExec'
+ spark.streams.stateStoreCoordinator // initialize the lazy coordinator
}
after {
StateStore.stop()
}
+ protected def setupStream(prefix: String, multiplier: Int):
(MemoryStream[Int], DataFrame) = {
Review comment:
It'd be pretty much helpful to provide guide comments on tracking
refactors.
e.g. this is equivalent to `StreamingOuterJoinSuite.setupStream` with
changing signature `private` to `protected` to co-use.
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -1041,3 +1077,104 @@ class StreamingOuterJoinSuite extends StreamTest with
StateStoreMetricsTest with
)
}
}
+
+class StreamingLeftSemiJoinSuite extends StreamingJoinSuite {
+
+ import testImplicits._
+
+ test("windowed left semi join") {
+ val (leftInput, rightInput, joined) = setupWindowedJoin("left_semi")
+
+ testStream(joined)(
+ MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7),
+ CheckNewAnswer(Row(3, 10, 6), Row(4, 10, 8), Row(5, 10, 10)),
+ MultiAddData(leftInput, 21)(rightInput, 22),
+ // Watermark = 11, should remove rows having window=[0,10]
+ CheckNewAnswer(),
+ assertNumStateRows(total = 2, updated = 12),
+ AddData(leftInput, 22),
+ CheckNewAnswer(Row(22, 30, 44)),
+ assertNumStateRows(total = 2, updated = 0),
+ StopStream,
+ StartStream(),
+
+ AddData(leftInput, 1),
+ // Row not add as 1 < state key watermark = 12
+ CheckNewAnswer(),
+ AddData(rightInput, 11),
+ // Row not add as 11 < state key watermark = 12
+ CheckNewAnswer()
+ )
+ }
+
+ test("left semi early state exclusion on left") {
+ val (leftInput, rightInput, joined) =
setupWindowedJoinWithLeftCondition("left_semi")
+
+ testStream(joined)(
+ MultiAddData(leftInput, 1, 2, 3)(rightInput, 1, 2, 3, 4, 5),
+ // The left rows with leftValue <= 4 should not generate their semi join
row and
+ // not get added to the state.
+ CheckNewAnswer(Row(3, 10, 6)),
+ assertNumStateRows(total = 4, updated = 4),
Review comment:
Can you describe the details on the state rows, and add the same num
state rows verification on below after each CheckNewAnswer?
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -1041,3 +1077,104 @@ class StreamingOuterJoinSuite extends StreamTest with
StateStoreMetricsTest with
)
}
}
+
+class StreamingLeftSemiJoinSuite extends StreamingJoinSuite {
+
+ import testImplicits._
+
+ test("windowed left semi join") {
+ val (leftInput, rightInput, joined) = setupWindowedJoin("left_semi")
+
+ testStream(joined)(
+ MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7),
+ CheckNewAnswer(Row(3, 10, 6), Row(4, 10, 8), Row(5, 10, 10)),
+ MultiAddData(leftInput, 21)(rightInput, 22),
+ // Watermark = 11, should remove rows having window=[0,10]
+ CheckNewAnswer(),
+ assertNumStateRows(total = 2, updated = 12),
+ AddData(leftInput, 22),
+ CheckNewAnswer(Row(22, 30, 44)),
+ assertNumStateRows(total = 2, updated = 0),
Review comment:
It might be better to add a code comment why there's no change on state.
(e.g. Unlike inner/outer joins, given left input row matches with right
input row, we don't buffer the left input row to the state.)
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -41,18 +41,174 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
+abstract class StreamingJoinSuite
+ extends StreamTest with StateStoreMetricsTest with BeforeAndAfter {
-class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest
with BeforeAndAfter {
+ import testImplicits._
before {
- SparkSession.setActiveSessionInternal(spark) // set this before force
initializing 'joinExec'
- spark.streams.stateStoreCoordinator // initialize the lazy coordinator
+ SparkSession.setActiveSessionInternal(spark) // set this before force
initializing 'joinExec'
+ spark.streams.stateStoreCoordinator // initialize the lazy coordinator
}
after {
StateStore.stop()
}
+ protected def setupStream(prefix: String, multiplier: Int):
(MemoryStream[Int], DataFrame) = {
+ val input = MemoryStream[Int]
+ val df = input.toDF
+ .select(
+ 'value as "key",
+ timestamp_seconds($"value") as s"${prefix}Time",
+ ('value * multiplier) as s"${prefix}Value")
+ .withWatermark(s"${prefix}Time", "10 seconds")
+
+ (input, df)
+ }
+
+ protected def setupWindowedJoin(joinType: String)
+ : (MemoryStream[Int], MemoryStream[Int], DataFrame) = {
+
+ val (input1, df1) = setupStream("left", 2)
+ val (input2, df2) = setupStream("right", 3)
+ val windowed1 = df1.select('key, window('leftTime, "10 second"),
'leftValue)
+ val windowed2 = df2.select('key, window('rightTime, "10 second"),
'rightValue)
+ val joined = windowed1.join(windowed2, Seq("key", "window"), joinType)
+ val select = if (joinType == "left_semi") {
+ joined.select('key, $"window.end".cast("long"), 'leftValue)
+ } else {
+ joined.select('key, $"window.end".cast("long"), 'leftValue, 'rightValue)
+ }
+
+ (input1, input2, select)
+ }
+
+ protected def setupWindowedJoinWithLeftCondition(joinType: String)
Review comment:
For reviewers: this is extracted from `test("left outer early state
exclusion on left")` / `test("right outer early state exclusion on left")`,
with adding select per join type.
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -41,18 +41,174 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
+abstract class StreamingJoinSuite
+ extends StreamTest with StateStoreMetricsTest with BeforeAndAfter {
-class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest
with BeforeAndAfter {
+ import testImplicits._
before {
- SparkSession.setActiveSessionInternal(spark) // set this before force
initializing 'joinExec'
- spark.streams.stateStoreCoordinator // initialize the lazy coordinator
+ SparkSession.setActiveSessionInternal(spark) // set this before force
initializing 'joinExec'
+ spark.streams.stateStoreCoordinator // initialize the lazy coordinator
}
after {
StateStore.stop()
}
+ protected def setupStream(prefix: String, multiplier: Int):
(MemoryStream[Int], DataFrame) = {
+ val input = MemoryStream[Int]
+ val df = input.toDF
+ .select(
+ 'value as "key",
+ timestamp_seconds($"value") as s"${prefix}Time",
+ ('value * multiplier) as s"${prefix}Value")
+ .withWatermark(s"${prefix}Time", "10 seconds")
+
+ (input, df)
+ }
+
+ protected def setupWindowedJoin(joinType: String)
+ : (MemoryStream[Int], MemoryStream[Int], DataFrame) = {
+
+ val (input1, df1) = setupStream("left", 2)
+ val (input2, df2) = setupStream("right", 3)
+ val windowed1 = df1.select('key, window('leftTime, "10 second"),
'leftValue)
+ val windowed2 = df2.select('key, window('rightTime, "10 second"),
'rightValue)
+ val joined = windowed1.join(windowed2, Seq("key", "window"), joinType)
+ val select = if (joinType == "left_semi") {
+ joined.select('key, $"window.end".cast("long"), 'leftValue)
+ } else {
+ joined.select('key, $"window.end".cast("long"), 'leftValue, 'rightValue)
+ }
+
+ (input1, input2, select)
+ }
+
+ protected def setupWindowedJoinWithLeftCondition(joinType: String)
+ : (MemoryStream[Int], MemoryStream[Int], DataFrame) = {
+
+ val (leftInput, df1) = setupStream("left", 2)
+ val (rightInput, df2) = setupStream("right", 3)
+ // Use different schemas to ensure the null row is being generated from
the correct side.
+ val left = df1.select('key, window('leftTime, "10 second"), 'leftValue)
+ val right = df2.select('key, window('rightTime, "10 second"),
'rightValue.cast("string"))
+
+ val joined = left.join(
+ right,
+ left("key") === right("key")
+ && left("window") === right("window")
+ && 'leftValue > 4,
+ joinType)
+
+ val select = if (joinType == "left_semi") {
+ joined.select(left("key"), left("window.end").cast("long"), 'leftValue)
+ } else if (joinType == "left_outer") {
+ joined.select(left("key"), left("window.end").cast("long"), 'leftValue,
'rightValue)
+ } else if (joinType == "right_outer") {
+ joined.select(right("key"), right("window.end").cast("long"),
'leftValue, 'rightValue)
+ } else {
+ joined
+ }
+
+ (leftInput, rightInput, select)
+ }
+
+ protected def setupWindowedJoinWithRightCondition(joinType: String)
+ : (MemoryStream[Int], MemoryStream[Int], DataFrame) = {
+
+ val (leftInput, df1) = setupStream("left", 2)
+ val (rightInput, df2) = setupStream("right", 3)
+ // Use different schemas to ensure the null row is being generated from
the correct side.
+ val left = df1.select('key, window('leftTime, "10 second"), 'leftValue)
+ val right = df2.select('key, window('rightTime, "10 second"),
'rightValue.cast("string"))
+
+ val joined = left.join(
+ right,
+ left("key") === right("key")
+ && left("window") === right("window")
+ && 'rightValue.cast("int") > 7,
+ joinType)
+
+ val select = if (joinType == "left_semi") {
+ joined.select(left("key"), left("window.end").cast("long"), 'leftValue)
+ } else if (joinType == "left_outer") {
+ joined.select(left("key"), left("window.end").cast("long"), 'leftValue,
'rightValue)
+ } else if (joinType == "right_outer") {
+ joined.select(right("key"), right("window.end").cast("long"),
'leftValue, 'rightValue)
+ } else {
+ joined
+ }
+
+ (leftInput, rightInput, select)
+ }
+
+ protected def setupWindowedJoinWithRangeCondition(joinType: String)
+ : (MemoryStream[(Int, Int)], MemoryStream[(Int, Int)], DataFrame) = {
+
+ val leftInput = MemoryStream[(Int, Int)]
+ val rightInput = MemoryStream[(Int, Int)]
+
+ val df1 = leftInput.toDF.toDF("leftKey", "time")
+ .select('leftKey, timestamp_seconds($"time") as "leftTime", ('leftKey *
2) as "leftValue")
+ .withWatermark("leftTime", "10 seconds")
+
+ val df2 = rightInput.toDF.toDF("rightKey", "time")
+ .select('rightKey, timestamp_seconds($"time") as "rightTime",
+ ('rightKey * 3) as "rightValue")
+ .withWatermark("rightTime", "10 seconds")
+
+ val joined =
+ df1.join(
+ df2,
+ expr("leftKey = rightKey AND " +
+ "leftTime BETWEEN rightTime - interval 5 seconds AND rightTime +
interval 5 seconds"),
+ joinType)
+
+ val select = if (joinType == "left_semi") {
+ joined.select('leftKey, 'leftTime.cast("int"))
+ } else {
+ joined.select('leftKey, 'rightKey, 'leftTime.cast("int"),
'rightTime.cast("int"))
+ }
+
+ (leftInput, rightInput, select)
+ }
+
+ protected def setupWindowedSelfJoin(joinType: String)
Review comment:
For reviewers: this is extracted from `test("SPARK-26187 self left outer
join should not return outer nulls for already matched rows")`, with
conditional select on left_semi vs others, as in left_semi only left side of
columns are available.
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -41,18 +41,174 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
+abstract class StreamingJoinSuite
+ extends StreamTest with StateStoreMetricsTest with BeforeAndAfter {
-class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest
with BeforeAndAfter {
+ import testImplicits._
before {
- SparkSession.setActiveSessionInternal(spark) // set this before force
initializing 'joinExec'
- spark.streams.stateStoreCoordinator // initialize the lazy coordinator
+ SparkSession.setActiveSessionInternal(spark) // set this before force
initializing 'joinExec'
+ spark.streams.stateStoreCoordinator // initialize the lazy coordinator
}
after {
StateStore.stop()
}
+ protected def setupStream(prefix: String, multiplier: Int):
(MemoryStream[Int], DataFrame) = {
+ val input = MemoryStream[Int]
+ val df = input.toDF
+ .select(
+ 'value as "key",
+ timestamp_seconds($"value") as s"${prefix}Time",
+ ('value * multiplier) as s"${prefix}Value")
+ .withWatermark(s"${prefix}Time", "10 seconds")
+
+ (input, df)
+ }
+
+ protected def setupWindowedJoin(joinType: String)
+ : (MemoryStream[Int], MemoryStream[Int], DataFrame) = {
+
+ val (input1, df1) = setupStream("left", 2)
+ val (input2, df2) = setupStream("right", 3)
+ val windowed1 = df1.select('key, window('leftTime, "10 second"),
'leftValue)
+ val windowed2 = df2.select('key, window('rightTime, "10 second"),
'rightValue)
+ val joined = windowed1.join(windowed2, Seq("key", "window"), joinType)
+ val select = if (joinType == "left_semi") {
+ joined.select('key, $"window.end".cast("long"), 'leftValue)
+ } else {
+ joined.select('key, $"window.end".cast("long"), 'leftValue, 'rightValue)
+ }
+
+ (input1, input2, select)
+ }
+
+ protected def setupWindowedJoinWithLeftCondition(joinType: String)
+ : (MemoryStream[Int], MemoryStream[Int], DataFrame) = {
+
+ val (leftInput, df1) = setupStream("left", 2)
+ val (rightInput, df2) = setupStream("right", 3)
+ // Use different schemas to ensure the null row is being generated from
the correct side.
+ val left = df1.select('key, window('leftTime, "10 second"), 'leftValue)
+ val right = df2.select('key, window('rightTime, "10 second"),
'rightValue.cast("string"))
+
+ val joined = left.join(
+ right,
+ left("key") === right("key")
+ && left("window") === right("window")
+ && 'leftValue > 4,
+ joinType)
+
+ val select = if (joinType == "left_semi") {
+ joined.select(left("key"), left("window.end").cast("long"), 'leftValue)
+ } else if (joinType == "left_outer") {
+ joined.select(left("key"), left("window.end").cast("long"), 'leftValue,
'rightValue)
+ } else if (joinType == "right_outer") {
+ joined.select(right("key"), right("window.end").cast("long"),
'leftValue, 'rightValue)
+ } else {
+ joined
+ }
+
+ (leftInput, rightInput, select)
+ }
+
+ protected def setupWindowedJoinWithRightCondition(joinType: String)
+ : (MemoryStream[Int], MemoryStream[Int], DataFrame) = {
+
+ val (leftInput, df1) = setupStream("left", 2)
+ val (rightInput, df2) = setupStream("right", 3)
+ // Use different schemas to ensure the null row is being generated from
the correct side.
+ val left = df1.select('key, window('leftTime, "10 second"), 'leftValue)
+ val right = df2.select('key, window('rightTime, "10 second"),
'rightValue.cast("string"))
+
+ val joined = left.join(
+ right,
+ left("key") === right("key")
+ && left("window") === right("window")
+ && 'rightValue.cast("int") > 7,
+ joinType)
+
+ val select = if (joinType == "left_semi") {
+ joined.select(left("key"), left("window.end").cast("long"), 'leftValue)
+ } else if (joinType == "left_outer") {
+ joined.select(left("key"), left("window.end").cast("long"), 'leftValue,
'rightValue)
+ } else if (joinType == "right_outer") {
+ joined.select(right("key"), right("window.end").cast("long"),
'leftValue, 'rightValue)
+ } else {
+ joined
+ }
+
+ (leftInput, rightInput, select)
+ }
+
+ protected def setupWindowedJoinWithRangeCondition(joinType: String)
Review comment:
For reviewers: this is extracted from
`test(s"${joinType.replaceAllLiterally("_", " ")} with watermark range
condition")`, with conditional select on left_semi vs others, as in left_semi
only left side of columns are available.
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -41,18 +41,174 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
+abstract class StreamingJoinSuite
+ extends StreamTest with StateStoreMetricsTest with BeforeAndAfter {
-class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest
with BeforeAndAfter {
+ import testImplicits._
before {
- SparkSession.setActiveSessionInternal(spark) // set this before force
initializing 'joinExec'
- spark.streams.stateStoreCoordinator // initialize the lazy coordinator
+ SparkSession.setActiveSessionInternal(spark) // set this before force
initializing 'joinExec'
+ spark.streams.stateStoreCoordinator // initialize the lazy coordinator
}
after {
StateStore.stop()
}
+ protected def setupStream(prefix: String, multiplier: Int):
(MemoryStream[Int], DataFrame) = {
+ val input = MemoryStream[Int]
+ val df = input.toDF
+ .select(
+ 'value as "key",
+ timestamp_seconds($"value") as s"${prefix}Time",
+ ('value * multiplier) as s"${prefix}Value")
+ .withWatermark(s"${prefix}Time", "10 seconds")
+
+ (input, df)
+ }
+
+ protected def setupWindowedJoin(joinType: String)
+ : (MemoryStream[Int], MemoryStream[Int], DataFrame) = {
+
+ val (input1, df1) = setupStream("left", 2)
+ val (input2, df2) = setupStream("right", 3)
+ val windowed1 = df1.select('key, window('leftTime, "10 second"),
'leftValue)
+ val windowed2 = df2.select('key, window('rightTime, "10 second"),
'rightValue)
+ val joined = windowed1.join(windowed2, Seq("key", "window"), joinType)
+ val select = if (joinType == "left_semi") {
+ joined.select('key, $"window.end".cast("long"), 'leftValue)
+ } else {
+ joined.select('key, $"window.end".cast("long"), 'leftValue, 'rightValue)
+ }
+
+ (input1, input2, select)
+ }
+
+ protected def setupWindowedJoinWithLeftCondition(joinType: String)
+ : (MemoryStream[Int], MemoryStream[Int], DataFrame) = {
+
+ val (leftInput, df1) = setupStream("left", 2)
+ val (rightInput, df2) = setupStream("right", 3)
+ // Use different schemas to ensure the null row is being generated from
the correct side.
+ val left = df1.select('key, window('leftTime, "10 second"), 'leftValue)
+ val right = df2.select('key, window('rightTime, "10 second"),
'rightValue.cast("string"))
+
+ val joined = left.join(
+ right,
+ left("key") === right("key")
+ && left("window") === right("window")
+ && 'leftValue > 4,
+ joinType)
+
+ val select = if (joinType == "left_semi") {
+ joined.select(left("key"), left("window.end").cast("long"), 'leftValue)
+ } else if (joinType == "left_outer") {
+ joined.select(left("key"), left("window.end").cast("long"), 'leftValue,
'rightValue)
+ } else if (joinType == "right_outer") {
+ joined.select(right("key"), right("window.end").cast("long"),
'leftValue, 'rightValue)
+ } else {
+ joined
+ }
+
+ (leftInput, rightInput, select)
+ }
+
+ protected def setupWindowedJoinWithRightCondition(joinType: String)
Review comment:
For reviewers: this is extracted from `test("left outer early state
exclusion on right")` / `test("right outer early state exclusion on right")`,
with adding select per join type.
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -1041,3 +1077,104 @@ class StreamingOuterJoinSuite extends StreamTest with
StateStoreMetricsTest with
)
}
}
+
+class StreamingLeftSemiJoinSuite extends StreamingJoinSuite {
+
+ import testImplicits._
+
+ test("windowed left semi join") {
+ val (leftInput, rightInput, joined) = setupWindowedJoin("left_semi")
+
+ testStream(joined)(
+ MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7),
+ CheckNewAnswer(Row(3, 10, 6), Row(4, 10, 8), Row(5, 10, 10)),
+ MultiAddData(leftInput, 21)(rightInput, 22),
+ // Watermark = 11, should remove rows having window=[0,10]
+ CheckNewAnswer(),
+ assertNumStateRows(total = 2, updated = 12),
+ AddData(leftInput, 22),
+ CheckNewAnswer(Row(22, 30, 44)),
+ assertNumStateRows(total = 2, updated = 0),
+ StopStream,
+ StartStream(),
+
+ AddData(leftInput, 1),
Review comment:
I see you've referred the existing test code:
```
AddData(input2, 1),
CheckNewAnswer(), // Should not join as <
15 removed
assertNumStateRows(total = 2, updated = 0), // row not add as 1 <
state key watermark = 15
AddData(input1, 5),
CheckNewAnswer(), // Same reason as above
assertNumStateRows(total = 2, updated = 0, droppedByWatermark = 1)
```
could you please add `assertNumStateRows` per case here as well?
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -1041,3 +1077,104 @@ class StreamingOuterJoinSuite extends StreamTest with
StateStoreMetricsTest with
)
}
}
+
+class StreamingLeftSemiJoinSuite extends StreamingJoinSuite {
+
+ import testImplicits._
+
+ test("windowed left semi join") {
+ val (leftInput, rightInput, joined) = setupWindowedJoin("left_semi")
+
+ testStream(joined)(
+ MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7),
+ CheckNewAnswer(Row(3, 10, 6), Row(4, 10, 8), Row(5, 10, 10)),
+ MultiAddData(leftInput, 21)(rightInput, 22),
+ // Watermark = 11, should remove rows having window=[0,10]
+ CheckNewAnswer(),
+ assertNumStateRows(total = 2, updated = 12),
+ AddData(leftInput, 22),
+ CheckNewAnswer(Row(22, 30, 44)),
+ assertNumStateRows(total = 2, updated = 0),
+ StopStream,
+ StartStream(),
+
+ AddData(leftInput, 1),
+ // Row not add as 1 < state key watermark = 12
+ CheckNewAnswer(),
+ AddData(rightInput, 11),
+ // Row not add as 11 < state key watermark = 12
+ CheckNewAnswer()
+ )
+ }
+
+ test("left semi early state exclusion on left") {
+ val (leftInput, rightInput, joined) =
setupWindowedJoinWithLeftCondition("left_semi")
+
+ testStream(joined)(
+ MultiAddData(leftInput, 1, 2, 3)(rightInput, 1, 2, 3, 4, 5),
+ // The left rows with leftValue <= 4 should not generate their semi join
row and
+ // not get added to the state.
+ CheckNewAnswer(Row(3, 10, 6)),
+ assertNumStateRows(total = 4, updated = 4),
+ // We shouldn't get more semi join rows when the watermark advances.
+ MultiAddData(leftInput, 20)(rightInput, 21),
+ CheckNewAnswer(),
+ AddData(rightInput, 20),
+ CheckNewAnswer((20, 30, 40))
+ )
+ }
+
+ test("left semi early state exclusion on right") {
+ val (leftInput, rightInput, joined) =
setupWindowedJoinWithRightCondition("left_semi")
+
+ testStream(joined)(
+ MultiAddData(leftInput, 3, 4, 5)(rightInput, 1, 2, 3),
+ // The right rows with rightValue <= 7 should never be added to the
state.
+ // The right row with rightValue = 9 > 7, hence joined and added to
state.
+ CheckNewAnswer(Row(3, 10, 6)),
+ assertNumStateRows(total = 4, updated = 4),
+ // We shouldn't get more semi join rows when the watermark advances.
+ MultiAddData(leftInput, 20)(rightInput, 21),
+ CheckNewAnswer(),
+ AddData(rightInput, 20),
+ CheckNewAnswer((20, 30, 40))
+ )
+ }
+
+ test("left semi join with watermark range condition") {
+ val (leftInput, rightInput, joined) =
setupWindowedJoinWithRangeCondition("left_semi")
+
+ testStream(joined)(
+ AddData(leftInput, (1, 5), (3, 5)),
+ CheckAnswer(),
+ AddData(rightInput, (1, 10), (2, 5)),
+ CheckNewAnswer((1, 5)),
+ AddData(rightInput, (1, 11)),
+ // No match as left time is too low and left row is already matched.
+ CheckNewAnswer(),
+ assertNumStateRows(total = 5, updated = 5),
+
+ // Increase event time watermark to 20s by adding data with time = 30s
on left input.
+ AddData(leftInput, (1, 7), (1, 30)),
+ CheckNewAnswer((1, 7)),
+ assertNumStateRows(total = 6, updated = 1),
+ // Watermark = 30 - 10 = 20, no matched row.
+ AddData(rightInput, (0, 30)),
+ CheckNewAnswer(),
+ assertNumStateRows(total = 2, updated = 1)
+ )
+ }
+
+ test("self left semi join") {
+ val (inputStream, query) = setupWindowedSelfJoin("left_semi")
+
+ testStream(query)(
+ AddData(inputStream, (1, 1L), (2, 2L), (3, 3L), (4, 4L), (5, 5L)),
+ CheckNewAnswer((2, 2), (4, 4)),
Review comment:
Same here.
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -1041,3 +1077,104 @@ class StreamingOuterJoinSuite extends StreamTest with
StateStoreMetricsTest with
)
}
}
+
+class StreamingLeftSemiJoinSuite extends StreamingJoinSuite {
+
+ import testImplicits._
+
+ test("windowed left semi join") {
+ val (leftInput, rightInput, joined) = setupWindowedJoin("left_semi")
+
+ testStream(joined)(
+ MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7),
+ CheckNewAnswer(Row(3, 10, 6), Row(4, 10, 8), Row(5, 10, 10)),
+ MultiAddData(leftInput, 21)(rightInput, 22),
+ // Watermark = 11, should remove rows having window=[0,10]
+ CheckNewAnswer(),
+ assertNumStateRows(total = 2, updated = 12),
+ AddData(leftInput, 22),
+ CheckNewAnswer(Row(22, 30, 44)),
+ assertNumStateRows(total = 2, updated = 0),
+ StopStream,
+ StartStream(),
+
+ AddData(leftInput, 1),
+ // Row not add as 1 < state key watermark = 12
+ CheckNewAnswer(),
+ AddData(rightInput, 11),
+ // Row not add as 11 < state key watermark = 12
+ CheckNewAnswer()
+ )
+ }
+
+ test("left semi early state exclusion on left") {
+ val (leftInput, rightInput, joined) =
setupWindowedJoinWithLeftCondition("left_semi")
+
+ testStream(joined)(
+ MultiAddData(leftInput, 1, 2, 3)(rightInput, 1, 2, 3, 4, 5),
+ // The left rows with leftValue <= 4 should not generate their semi join
row and
+ // not get added to the state.
+ CheckNewAnswer(Row(3, 10, 6)),
+ assertNumStateRows(total = 4, updated = 4),
+ // We shouldn't get more semi join rows when the watermark advances.
+ MultiAddData(leftInput, 20)(rightInput, 21),
+ CheckNewAnswer(),
+ AddData(rightInput, 20),
+ CheckNewAnswer((20, 30, 40))
+ )
+ }
+
+ test("left semi early state exclusion on right") {
+ val (leftInput, rightInput, joined) =
setupWindowedJoinWithRightCondition("left_semi")
+
+ testStream(joined)(
+ MultiAddData(leftInput, 3, 4, 5)(rightInput, 1, 2, 3),
+ // The right rows with rightValue <= 7 should never be added to the
state.
+ // The right row with rightValue = 9 > 7, hence joined and added to
state.
+ CheckNewAnswer(Row(3, 10, 6)),
+ assertNumStateRows(total = 4, updated = 4),
+ // We shouldn't get more semi join rows when the watermark advances.
+ MultiAddData(leftInput, 20)(rightInput, 21),
+ CheckNewAnswer(),
+ AddData(rightInput, 20),
+ CheckNewAnswer((20, 30, 40))
+ )
+ }
+
+ test("left semi join with watermark range condition") {
+ val (leftInput, rightInput, joined) =
setupWindowedJoinWithRangeCondition("left_semi")
+
+ testStream(joined)(
+ AddData(leftInput, (1, 5), (3, 5)),
+ CheckAnswer(),
Review comment:
Same here.
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -1041,3 +1077,104 @@ class StreamingOuterJoinSuite extends StreamTest with
StateStoreMetricsTest with
)
}
}
+
+class StreamingLeftSemiJoinSuite extends StreamingJoinSuite {
+
+ import testImplicits._
+
+ test("windowed left semi join") {
+ val (leftInput, rightInput, joined) = setupWindowedJoin("left_semi")
+
+ testStream(joined)(
+ MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7),
+ CheckNewAnswer(Row(3, 10, 6), Row(4, 10, 8), Row(5, 10, 10)),
+ MultiAddData(leftInput, 21)(rightInput, 22),
+ // Watermark = 11, should remove rows having window=[0,10]
+ CheckNewAnswer(),
+ assertNumStateRows(total = 2, updated = 12),
+ AddData(leftInput, 22),
+ CheckNewAnswer(Row(22, 30, 44)),
+ assertNumStateRows(total = 2, updated = 0),
+ StopStream,
+ StartStream(),
+
+ AddData(leftInput, 1),
+ // Row not add as 1 < state key watermark = 12
+ CheckNewAnswer(),
+ AddData(rightInput, 11),
+ // Row not add as 11 < state key watermark = 12
+ CheckNewAnswer()
+ )
+ }
+
+ test("left semi early state exclusion on left") {
+ val (leftInput, rightInput, joined) =
setupWindowedJoinWithLeftCondition("left_semi")
+
+ testStream(joined)(
+ MultiAddData(leftInput, 1, 2, 3)(rightInput, 1, 2, 3, 4, 5),
+ // The left rows with leftValue <= 4 should not generate their semi join
row and
+ // not get added to the state.
+ CheckNewAnswer(Row(3, 10, 6)),
+ assertNumStateRows(total = 4, updated = 4),
+ // We shouldn't get more semi join rows when the watermark advances.
+ MultiAddData(leftInput, 20)(rightInput, 21),
+ CheckNewAnswer(),
+ AddData(rightInput, 20),
+ CheckNewAnswer((20, 30, 40))
+ )
+ }
+
+ test("left semi early state exclusion on right") {
+ val (leftInput, rightInput, joined) =
setupWindowedJoinWithRightCondition("left_semi")
+
+ testStream(joined)(
+ MultiAddData(leftInput, 3, 4, 5)(rightInput, 1, 2, 3),
+ // The right rows with rightValue <= 7 should never be added to the
state.
+ // The right row with rightValue = 9 > 7, hence joined and added to
state.
+ CheckNewAnswer(Row(3, 10, 6)),
+ assertNumStateRows(total = 4, updated = 4),
Review comment:
Same here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]