Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/19327#discussion_r140606013
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
---
@@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager(
/**
* Remove using a predicate on keys. See class docs for more context and
implement details.
*/
- def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = {
- val allKeyToNumValues = keyToNumValues.iterator
-
- while (allKeyToNumValues.hasNext) {
- val keyToNumValue = allKeyToNumValues.next
- if (condition(keyToNumValue.key)) {
- keyToNumValues.remove(keyToNumValue.key)
- keyWithIndexToValue.removeAllValues(keyToNumValue.key,
keyToNumValue.numValue)
+ def removeByKeyCondition(condition: UnsafeRow => Boolean):
Iterator[(UnsafeRow, UnsafeRow)] = {
+ new NextIterator[(UnsafeRow, UnsafeRow)] {
+
+ private val allKeyToNumValues = keyToNumValues.iterator
+
+ private var currentKeyToNumValue: Option[KeyAndNumValues] = None
--- End diff --
Dont use Options/Some inside tight loops performance-sensitive loops.
Creates too many garbage objects very fast. Its fine to use null here for
performance reason.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]