c21 commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r508876329
##########
File path:
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
##########
@@ -490,15 +490,77 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
_.join(_, joinType = LeftSemi),
streamStreamSupported = false,
batchStreamSupported = false,
- expectedMsg = "left semi/anti joins")
+ expectedMsg = "LeftSemi join")
+
+ // Left semi joins: update and complete mode not allowed
+ assertNotSupportedInStreamingPlan(
+ s"left semi join with stream-stream relations and update mode",
+ streamRelation.join(streamRelation, joinType = LeftSemi,
+ condition = Some(attribute === attribute)),
+ OutputMode.Update(),
+ Seq("is not supported in Update output mode"))
+ assertNotSupportedInStreamingPlan(
+ s"left semi join with stream-stream relations and complete mode",
Review comment:
@viirya - updated for all places.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -246,37 +249,43 @@ case class StreamingSymmetricHashJoinExec(
// Join one side input using the other side's buffered/state rows. Here
is how it is done.
//
- // - `leftSideJoiner.storeAndJoinWithOtherSide(rightSideJoiner)`
generates all rows from
- // matching new left input with stored right input, and also stores all
the left input
+ // - `leftSideJoiner.storeAndJoinWithOtherSide(rightSideJoiner)`
+ // - inner, left outer, right outer join: generates all rows from
matching new left input
Review comment:
@viirya - updated.
##########
File path:
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
##########
@@ -490,15 +490,77 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
_.join(_, joinType = LeftSemi),
streamStreamSupported = false,
batchStreamSupported = false,
- expectedMsg = "left semi/anti joins")
+ expectedMsg = "LeftSemi join")
+
+ // Left semi joins: update and complete mode not allowed
+ assertNotSupportedInStreamingPlan(
+ s"left semi join with stream-stream relations and update mode",
Review comment:
@viirya - updated.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -347,22 +360,28 @@ case class StreamingSymmetricHashJoinExec(
// Processing time between inner output completion and here comes from
the outer portion of a
Review comment:
@viirya - updated.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
##########
@@ -99,13 +99,20 @@ class SymmetricHashJoinStateManager(
/**
* Get all the matched values for given join condition, with marking matched.
* This method is designed to mark joined rows properly without exposing
internal index of row.
+ *
+ * @param joinOnlyFirstTimeMatchedRow Only join with first-time matched row.
Review comment:
@HeartSaVioR - after more thought I feel the early eviction is more
complicated and needs more thoughts.
For left semi join state store eviction:
* [if the watermark predicate is on
value](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L542),
we can add a method in `SymmetricHashJoinStateManager` or change current
method `SymmetricHashJoinStateManager.removeByValueCondition()` to remove
matched values when iterating all values.
* [if the watermark predicate is on
key](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L540),
only the values of keys passed predicate will be iterated, but not all values
in state store. So we need more efficient way to evict matched values here,
otherwise we need to iterate all keys and values to find matched values.
So I think this piece of code may be still needed (not remove completely
away). Maybe we we can store the matched rows in some other data structure
after getting all matched rows here. I feel a follow-up PR is appropriate,
WDYT? @HeartSaVioR
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -347,22 +360,28 @@ case class StreamingSymmetricHashJoinExec(
// Processing time between inner output completion and here comes from
the outer portion of a
// join, and thus counts as removal time as we remove old state from one
side while iterating.
- if (innerOutputCompletionTimeNs != 0) {
+ if (hashJoinOutputCompletionTimeNs != 0) {
allRemovalsTimeMs +=
- math.max(NANOSECONDS.toMillis(System.nanoTime -
innerOutputCompletionTimeNs), 0)
+ math.max(NANOSECONDS.toMillis(System.nanoTime -
hashJoinOutputCompletionTimeNs), 0)
}
allRemovalsTimeMs += timeTakenMs {
// Remove any remaining state rows which aren't needed because they're
below the watermark.
//
- // For inner joins, we have to remove unnecessary state rows from both
sides if possible.
+ // For inner and left semi joins, we have to remove unnecessary state
rows from both sides
+ // if possible.
+ //
// For outer joins, we have already removed unnecessary state rows
from the outer side
// (e.g., left side for left outer join) while generating the outer
"null" outputs. Now, we
// have to remove unnecessary state rows from the other side (e.g.,
right side for the left
// outer join) if possible. In all cases, nothing needs to be
outputted, hence the removal
// needs to be done greedily by immediately consuming the returned
iterator.
+ //
+ // For left semi joins, we have to remove unnecessary state rows from
both sides if
+ // possible.
Review comment:
@viirya - removed.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]