anishshri-db commented on code in PR #45467:
URL: https://github.com/apache/spark/pull/45467#discussion_r1530889290
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -341,8 +475,67 @@ case class TransformWithStateExec(
processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
processDataWithPartition(singleIterator, store, processorHandle)
}
+
+ private def processDataWithInitialState(
+ store: StateStore,
+ childDataIterator: Iterator[InternalRow],
+ initStateIterator: Iterator[InternalRow]):
+ CompletionIterator[InternalRow, Iterator[InternalRow]] = {
+ val processorHandle = new StatefulProcessorHandleImpl(store,
getStateInfo.queryRunId,
+ keyEncoder, timeoutMode, isStreaming)
+ assert(processorHandle.getHandleState ==
StatefulProcessorHandleState.CREATED)
+ statefulProcessor.setHandle(processorHandle)
+ statefulProcessor.init(outputMode, timeoutMode)
+ processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
+
+ // Check if is first batch
+ // Only process initial states for first batch
+ if (processorHandle.getQueryInfo().getBatchId == 0) {
+ // If the user provided initial state, we need to have the initial state
and the
+ // data in the same partition so that we can still have just one commit
at the end.
+ val groupedInitialStateIter =
+ GroupedIterator(initStateIterator,
+ initialStateGroupingAttrs, initialState.output)
+ groupedInitialStateIter.foreach {
+ case (keyRow, valueRowIter) =>
+ processInitialStateRows(keyRow.asInstanceOf[UnsafeRow],
+ valueRowIter)
+ }
+ }
+
+ processDataWithPartition(childDataIterator, store, processorHandle,
Option(initStateIterator))
+ }
+
+ /** This class zip two RDDs together into the same partition, and returns
partition id */
+ class ZipPartitionsWithIndexRDD[A: ClassTag, B: ClassTag, V: ClassTag](
Review Comment:
Could we reuse `stateStoreAwareZipPartitions` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]