HeartSaVioR commented on a change in pull request #33081:
URL: https://github.com/apache/spark/pull/33081#discussion_r670213455
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsIterator.scala
##########
@@ -190,19 +191,21 @@ class UpdatingSessionsIterator(
}
private def closeCurrentSession(keyChanged: Boolean): Unit = {
- assert(returnRowsIter == null || !returnRowsIter.hasNext)
-
returnRows = rowsForCurrentSession
rowsForCurrentSession = null
- val groupingKey = generateGroupingKey()
+ val groupingKey = generateGroupingKey().copy()
val currentRowsIter = returnRows.generateIterator().map { internalRow =>
val valueRow = valueProj(internalRow)
restoreProj(join2(groupingKey, valueRow)).copy()
}
- returnRowsIter = currentRowsIter
+ if (returnRowsIter != null && returnRowsIter.hasNext) {
+ returnRowsIter = returnRowsIter ++ currentRowsIter
+ } else {
+ returnRowsIter = currentRowsIter
+ }
Review comment:
I found some edge-case that the last two input rows close session. If
that happens, `returnRowsIter` is not null in this method (that's why I removed
assertion) and we have to append both iterators instead of simply replacing.
That's why I have to add `.copy()` in groupingKey as evaluation takes lazily
and somehow two iterators would use same key which it shouldn't.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]