jingz-db commented on code in PR #45467:
URL: https://github.com/apache/spark/pull/45467#discussion_r1531068752
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -127,13 +152,53 @@ case class TransformWithStateExec(
mappedIterator
}
+ private def processInitialStateRows(
+ keyRow: UnsafeRow,
+ initStateIter: Iterator[InternalRow]): Unit = {
+ val getKeyObj =
+ ObjectOperator.deserializeRowToObject(keyDeserializer,
groupingAttributes)
+
+ val getStateValueObj =
+ ObjectOperator.deserializeRowToObject(initialStateDeserializer,
initialStateDataAttrs)
+
+ val keyObj = getKeyObj(keyRow) // convert key to objects
+ ImplicitGroupingKeyTracker.setImplicitKey(keyObj)
+ val initStateObjIter = initStateIter.map(getStateValueObj.apply)
+
+ initStateObjIter.foreach { initState =>
+ statefulProcessor
+ .asInstanceOf[StatefulProcessorWithInitialState[Any, Any, Any, Any]]
+ .handleInitialState(keyObj, initState)
+ }
+ ImplicitGroupingKeyTracker.removeImplicitKey()
+ }
+
private def processNewData(dataIter: Iterator[InternalRow]):
Iterator[InternalRow] = {
val groupedIter = GroupedIterator(dataIter, groupingAttributes,
child.output)
groupedIter.flatMap { case (keyRow, valueRowIter) =>
val keyUnsafeRow = keyRow.asInstanceOf[UnsafeRow]
handleInputRows(keyUnsafeRow, valueRowIter)
}
}
+// TODO double check this
+ private def processNewDataWithInitialState(
+ dataIter: Iterator[InternalRow],
+ initStateIter: Iterator[InternalRow]): Iterator[InternalRow] = {
+
+ val groupedChildDataIter = GroupedIterator(dataIter, groupingAttributes,
child.output)
+ val groupedInitialStateIter =
+ GroupedIterator(initStateIter, initialStateGroupingAttrs,
initialState.output)
+
+ // Create a CoGroupedIterator that will group the two iterators together
for every key group.
+ new CoGroupedIterator(
+ groupedChildDataIter, groupedInitialStateIter,
groupingAttributes).flatMap {
+ case (keyRow, valueRowIter, initialStateRowIter) =>
+ // TODO in design doc: trying to re-initialize state for the same
+ // grouping key will result in an error?
Review Comment:
Not sure if I understand the scenario correctly, does it mean user tries to
reassign state variable values in user defined function `handleInitialState()`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]