HeartSaVioR commented on a change in pull request #33081:
URL: https://github.com/apache/spark/pull/33081#discussion_r670865513



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
##########
@@ -511,6 +513,298 @@ case class StateStoreSaveExec(
     copy(child = newChild)
 }
 
+/**
+ * This class sorts input rows and existing sessions in state and provides 
output rows as
+ * sorted by "group keys + start time of session window".
+ *
+ * Refer [[MergingSortWithSessionWindowStateIterator]] for more details.
+ */
+case class SessionWindowStateStoreRestoreExec(
+    keyWithoutSessionExpressions: Seq[Attribute],
+    sessionExpression: Attribute,
+    stateInfo: Option[StatefulOperatorStateInfo],
+    eventTimeWatermark: Option[Long],
+    stateFormatVersion: Int,
+    child: SparkPlan)
+  extends UnaryExecNode with StateStoreReader with WatermarkSupport {
+
+  override def keyExpressions: Seq[Attribute] = keyWithoutSessionExpressions
+
+  private val stateManager = 
StreamingSessionWindowStateManager.createStateManager(
+    keyWithoutSessionExpressions, sessionExpression, child.output, 
stateFormatVersion)
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    val numOutputRows = longMetric("numOutputRows")
+    assert(keyExpressions.nonEmpty, "Grouping key must be specified when using 
sessionWindow")
+
+    child.execute().mapPartitionsWithReadStateStore(
+      getStateInfo,
+      stateManager.getStateKeySchema,
+      stateManager.getStateValueSchema,
+      numColsPrefixKey = stateManager.getNumColsForPrefixKey,
+      session.sessionState,
+      Some(session.streams.stateStoreCoordinator)) { case (store, iter) =>
+
+      // We need to filter out outdated inputs
+      val filteredIterator = watermarkPredicateForData match {
+        case Some(predicate) => iter.filter((row: InternalRow) => 
!predicate.eval(row))
+        case None => iter
+      }
+
+      new MergingSortWithSessionWindowStateIterator(
+        filteredIterator,
+        stateManager,
+        store,
+        keyWithoutSessionExpressions,
+        sessionExpression,
+        child.output).map { row =>
+        numOutputRows += 1
+        row
+      }
+    }
+  }
+
+  override def output: Seq[Attribute] = child.output
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def outputOrdering: Seq[SortOrder] = {
+    (keyWithoutSessionExpressions ++ Seq(sessionExpression)).map(SortOrder(_, 
Ascending))
+  }
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+    if (keyWithoutSessionExpressions.isEmpty) {

Review comment:
       Yeah. Probably better to move the assertion to initialization instead of 
doExecute, and change code to ignore the case key expression is empty. Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to