HeartSaVioR commented on a change in pull request #33081:
URL: https://github.com/apache/spark/pull/33081#discussion_r670893597
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
##########
@@ -511,6 +513,298 @@ case class StateStoreSaveExec(
copy(child = newChild)
}
+/**
+ * This class sorts input rows and existing sessions in state and provides
output rows as
+ * sorted by "group keys + start time of session window".
+ *
+ * Refer [[MergingSortWithSessionWindowStateIterator]] for more details.
+ */
+case class SessionWindowStateStoreRestoreExec(
+ keyWithoutSessionExpressions: Seq[Attribute],
+ sessionExpression: Attribute,
+ stateInfo: Option[StatefulOperatorStateInfo],
+ eventTimeWatermark: Option[Long],
+ stateFormatVersion: Int,
+ child: SparkPlan)
+ extends UnaryExecNode with StateStoreReader with WatermarkSupport {
+
+ override def keyExpressions: Seq[Attribute] = keyWithoutSessionExpressions
+
+ private val stateManager =
StreamingSessionWindowStateManager.createStateManager(
+ keyWithoutSessionExpressions, sessionExpression, child.output,
stateFormatVersion)
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ val numOutputRows = longMetric("numOutputRows")
+ assert(keyExpressions.nonEmpty, "Grouping key must be specified when using
sessionWindow")
+
+ child.execute().mapPartitionsWithReadStateStore(
+ getStateInfo,
+ stateManager.getStateKeySchema,
+ stateManager.getStateValueSchema,
+ numColsPrefixKey = stateManager.getNumColsForPrefixKey,
+ session.sessionState,
+ Some(session.streams.stateStoreCoordinator)) { case (store, iter) =>
+
+ // We need to filter out outdated inputs
+ val filteredIterator = watermarkPredicateForData match {
+ case Some(predicate) => iter.filter((row: InternalRow) =>
!predicate.eval(row))
+ case None => iter
+ }
+
+ new MergingSortWithSessionWindowStateIterator(
+ filteredIterator,
+ stateManager,
+ store,
+ keyWithoutSessionExpressions,
+ sessionExpression,
+ child.output).map { row =>
+ numOutputRows += 1
+ row
+ }
+ }
+ }
+
+ override def output: Seq[Attribute] = child.output
+
+ override def outputPartitioning: Partitioning = child.outputPartitioning
+
+ override def outputOrdering: Seq[SortOrder] = {
+ (keyWithoutSessionExpressions ++ Seq(sessionExpression)).map(SortOrder(_,
Ascending))
+ }
+
+ override def requiredChildDistribution: Seq[Distribution] = {
+ if (keyWithoutSessionExpressions.isEmpty) {
+ AllTuples :: Nil
+ } else {
+ ClusteredDistribution(keyWithoutSessionExpressions,
stateInfo.map(_.numPartitions)) :: Nil
+ }
+ }
+
+ override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
+ Seq((keyWithoutSessionExpressions ++
Seq(sessionExpression)).map(SortOrder(_, Ascending)))
+ }
+
+ override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
+ copy(child = newChild)
+}
+
+/**
+ * For each input tuple, the key is calculated and the tuple is `put` into the
[[StateStore]].
+ */
+case class SessionWindowStateStoreSaveExec(
+ keyExpressions: Seq[Attribute],
+ sessionExpression: Attribute,
+ stateInfo: Option[StatefulOperatorStateInfo] = None,
+ outputMode: Option[OutputMode] = None,
+ eventTimeWatermark: Option[Long] = None,
+ stateFormatVersion: Int,
+ child: SparkPlan)
+ extends UnaryExecNode with StateStoreWriter with WatermarkSupport {
+
+ private val keyWithoutSessionExpressions = keyExpressions.filterNot { p =>
Review comment:
Actually the name has been confusing; let's do the same with
`SessionWindowStateStoreRestoreExec`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]