Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/19327#discussion_r140605588
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
---
@@ -324,17 +367,34 @@ case class StreamingSymmetricHashJoinExec(
}
}
- /** Remove old buffered state rows using watermarks for state keys and
values */
- def removeOldState(): Unit = {
+ /**
+ * Builds an iterator over old state key-value pairs, removing them
lazily as they're produced.
+ *
+ * This iterator is dangerous! It must be consumed fully before any
other operations are made
+ * against this joiner's join state manager, and in particular commits
must not happen while
+ * this iterator is ongoing. The intermediate states of the iterator
leave the state manager in
+ * an invalid configuration.
+ *
+ * We do this unsafe thing to avoid requiring either two passes or
full materialization when
+ * processing the rows for outer join.
+ */
+ def removeOldState(): Iterator[(UnsafeRow, UnsafeRow)] = {
--- End diff --
Use `UnsafeRowPair` instead of Tuple2 (i.e. () is shorthand for
scala.Tuple2). It reuses the tuple (or equivalent object. Avoid creation of a
lot of short term objects, thus reducing GC pressure.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]