HeartSaVioR commented on code in PR #36090:
URL: https://github.com/apache/spark/pull/36090#discussion_r844568096
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala:
##########
@@ -597,22 +603,30 @@ class SymmetricHashJoinStateManager(
/**
* Get all values and indices for the provided key.
* Should not return null.
+ * Note that we will skip nulls explicitly if config setting for the same
is
+ * set to true via STATE_STORE_SKIP_NULLS_FOR_STREAM_STREAM_JOINS.
*/
def getAll(key: UnsafeRow, numValues: Long):
Iterator[KeyWithIndexAndValue] = {
- val keyWithIndexAndValue = new KeyWithIndexAndValue()
- var index = 0
new NextIterator[KeyWithIndexAndValue] {
+ private val keyWithIndexAndValue = new KeyWithIndexAndValue()
+ private var index: Long = 0L
+
+ private def hasMoreValues = index < numValues
override protected def getNext(): KeyWithIndexAndValue = {
- if (index >= numValues) {
- finished = true
- null
- } else {
+ while (hasMoreValues) {
Review Comment:
Yes I'm not sure I see the case of "out of bound". Could you please
elaborate?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]