Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/19327#discussion_r140910781
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
---
@@ -324,17 +389,33 @@ case class StreamingSymmetricHashJoinExec(
}
}
- /** Remove old buffered state rows using watermarks for state keys and
values */
- def removeOldState(): Unit = {
+ /**
+ * Builds an iterator over old state key-value pairs, removing them
lazily as they're produced.
+ *
+ * @note This iterator must be consumed fully before any other
operations are made
+ * against this joiner's join state manager. For efficiency reasons,
the intermediate states of
+ * the iterator leave the state manager in an invalid configuration.
+ *
+ * We do this to avoid requiring either two passes or full
materialization when
+ * processing the rows for outer join.
+ */
+ def removeOldState(): Iterator[UnsafeRowPair] = {
stateWatermarkPredicate match {
case Some(JoinStateKeyWatermarkPredicate(expr)) =>
joinStateManager.removeByKeyCondition(stateKeyWatermarkPredicateFunc)
case Some(JoinStateValueWatermarkPredicate(expr)) =>
joinStateManager.removeByValueCondition(stateValueWatermarkPredicateFunc)
- case _ =>
+ case _ => Iterator()
--- End diff --
you can do Iterator.empty. more obvious
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]