anishshri-db commented on code in PR #47641:
URL: https://github.com/apache/spark/pull/47641#discussion_r1706212242
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -198,7 +204,29 @@ case class TransformWithStateExec(
getOutputRow(obj)
}
ImplicitGroupingKeyTracker.removeImplicitKey()
- mappedIterator
+
+ // Wrapper to ensure that the implicit key is set when the methods on the
iterator
+ // are called. Inside of processNewData, we use a GroupedIterator, so
handleInputRows
+ // is only called once per key. As such, we don't strictly need to
set/unset the
+ // implicit key on every call to next(); we just need to set it on the
first call
+ // to hasNext and unset it after the last call to next. However, such an
optimization
+ // relies on knowing this implementation detail of processNewData, so we
set/unset the
+ // implicit key on every call to a mappedIterator's public methods to be
safe.
+ new Iterator[InternalRow] {
+ override def hasNext: Boolean = {
+ ImplicitGroupingKeyTracker.setImplicitKey(keyObj)
Review Comment:
Is it possible to set this and remove this once for the iterator lifetime ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]