jerrypeng commented on code in PR #36090:
URL: https://github.com/apache/spark/pull/36090#discussion_r845624420


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala:
##########
@@ -597,22 +603,30 @@ class SymmetricHashJoinStateManager(
     /**
      * Get all values and indices for the provided key.
      * Should not return null.
+     * Note that we will skip nulls explicitly if config setting for the same 
is
+     * set to true via STATE_STORE_SKIP_NULLS_FOR_STREAM_STREAM_JOINS.
      */
     def getAll(key: UnsafeRow, numValues: Long): 
Iterator[KeyWithIndexAndValue] = {
-      val keyWithIndexAndValue = new KeyWithIndexAndValue()
-      var index = 0
       new NextIterator[KeyWithIndexAndValue] {
+        private val keyWithIndexAndValue = new KeyWithIndexAndValue()
+        private var index: Long = 0L
+
+        private def hasMoreValues = index < numValues
         override protected def getNext(): KeyWithIndexAndValue = {
-          if (index >= numValues) {
-            finished = true
-            null
-          } else {
+          while (hasMoreValues) {

Review Comment:
   Oh I just realized its a "def" must have thought it was just a variable. 
This is not an issue then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to