HeartSaVioR commented on a change in pull request #33081:
URL: https://github.com/apache/spark/pull/33081#discussion_r670894308



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
##########
@@ -511,6 +513,298 @@ case class StateStoreSaveExec(
     copy(child = newChild)
 }
 
+/**
+ * This class sorts input rows and existing sessions in state and provides 
output rows as
+ * sorted by "group keys + start time of session window".
+ *
+ * Refer [[MergingSortWithSessionWindowStateIterator]] for more details.
+ */
+case class SessionWindowStateStoreRestoreExec(
+    keyWithoutSessionExpressions: Seq[Attribute],
+    sessionExpression: Attribute,
+    stateInfo: Option[StatefulOperatorStateInfo],
+    eventTimeWatermark: Option[Long],
+    stateFormatVersion: Int,
+    child: SparkPlan)
+  extends UnaryExecNode with StateStoreReader with WatermarkSupport {
+
+  override def keyExpressions: Seq[Attribute] = keyWithoutSessionExpressions
+
+  private val stateManager = 
StreamingSessionWindowStateManager.createStateManager(
+    keyWithoutSessionExpressions, sessionExpression, child.output, 
stateFormatVersion)
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    val numOutputRows = longMetric("numOutputRows")
+    assert(keyExpressions.nonEmpty, "Grouping key must be specified when using 
sessionWindow")
+
+    child.execute().mapPartitionsWithReadStateStore(
+      getStateInfo,
+      stateManager.getStateKeySchema,
+      stateManager.getStateValueSchema,
+      numColsPrefixKey = stateManager.getNumColsForPrefixKey,
+      session.sessionState,
+      Some(session.streams.stateStoreCoordinator)) { case (store, iter) =>
+
+      // We need to filter out outdated inputs
+      val filteredIterator = watermarkPredicateForData match {
+        case Some(predicate) => iter.filter((row: InternalRow) => 
!predicate.eval(row))
+        case None => iter
+      }
+
+      new MergingSortWithSessionWindowStateIterator(
+        filteredIterator,
+        stateManager,
+        store,
+        keyWithoutSessionExpressions,
+        sessionExpression,
+        child.output).map { row =>
+        numOutputRows += 1
+        row
+      }
+    }
+  }
+
+  override def output: Seq[Attribute] = child.output
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def outputOrdering: Seq[SortOrder] = {
+    (keyWithoutSessionExpressions ++ Seq(sessionExpression)).map(SortOrder(_, 
Ascending))
+  }
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+    if (keyWithoutSessionExpressions.isEmpty) {
+      AllTuples :: Nil
+    } else {
+      ClusteredDistribution(keyWithoutSessionExpressions, 
stateInfo.map(_.numPartitions)) :: Nil
+    }
+  }
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
+    Seq((keyWithoutSessionExpressions ++ 
Seq(sessionExpression)).map(SortOrder(_, Ascending)))
+  }
+
+  override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
+    copy(child = newChild)
+}
+
+/**
+ * For each input tuple, the key is calculated and the tuple is `put` into the 
[[StateStore]].
+ */
+case class SessionWindowStateStoreSaveExec(
+    keyExpressions: Seq[Attribute],
+    sessionExpression: Attribute,
+    stateInfo: Option[StatefulOperatorStateInfo] = None,
+    outputMode: Option[OutputMode] = None,
+    eventTimeWatermark: Option[Long] = None,
+    stateFormatVersion: Int,
+    child: SparkPlan)
+  extends UnaryExecNode with StateStoreWriter with WatermarkSupport {
+
+  private val keyWithoutSessionExpressions = keyExpressions.filterNot { p =>
+    p.semanticEquals(sessionExpression)
+  }
+
+  private val stateManager = 
StreamingSessionWindowStateManager.createStateManager(
+    keyWithoutSessionExpressions, sessionExpression, child.output, 
stateFormatVersion)
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    metrics // force lazy init at driver
+    assert(outputMode.nonEmpty,
+      "Incorrect planning in IncrementalExecution, outputMode has not been 
set")
+    assert(keyExpressions.nonEmpty,
+      "Grouping key must be specified when using sessionWindow")
+
+    child.execute().mapPartitionsWithStateStore(
+      getStateInfo,
+      stateManager.getStateKeySchema,
+      stateManager.getStateValueSchema,
+      numColsPrefixKey = stateManager.getNumColsForPrefixKey,
+      session.sessionState,
+      Some(session.streams.stateStoreCoordinator)) { case (store, iter) =>
+
+      val numOutputRows = longMetric("numOutputRows")
+      val allUpdatesTimeMs = longMetric("allUpdatesTimeMs")
+      val allRemovalsTimeMs = longMetric("allRemovalsTimeMs")
+      val commitTimeMs = longMetric("commitTimeMs")
+
+      outputMode match {
+        // Update and output all rows in the StateStore.
+        case Some(Complete) =>
+          allUpdatesTimeMs += timeTakenMs {
+            putToStore(iter, store, false)
+          }
+          allRemovalsTimeMs += 0

Review comment:
       This is just to make sure we make consistent with StateStoreSaveExec, 
but looks like not needed. Let's remove and see.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to