HeartSaVioR commented on a change in pull request #33081:
URL: https://github.com/apache/spark/pull/33081#discussion_r670208288



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala
##########
@@ -345,4 +351,179 @@ object AggUtils {
 
     finalAndCompleteAggregate :: Nil
   }
+
+  /**
+   * Plans a streaming session aggregation using the following progression:
+   *
+   *  - Partial Aggregation
+   *    - all tuples will have aggregated columns with initial value
+   *  - (If 
"spark.sql.streaming.sessionWindow.merge.sessions.in.local.partition" is 
enabled)
+   *    - Sort within partition (sort: all keys)
+   *    - MergingSessionExec
+   *      - calculate session among tuples, and aggregate tuples in session 
with partial merge
+   *  - Shuffle & Sort (distribution: keys "without" session, sort: all keys)
+   *  - SessionWindowStateStoreRestore (group: keys "without" session)
+   *    - merge input tuples with stored tuples (sessions) respecting sort 
order
+   *  - MergingSessionExec
+   *    - calculate session among tuples, and aggregate tuples in session with 
partial merge
+   *    - NOTE: it leverages the fact that the output of 
SessionWindowStateStoreRestore is sorted
+   *    - now there is at most 1 tuple per group, key with session
+   *  - SessionWindowStateStoreSave (group: keys "without" session)
+   *    - saves tuple(s) for the next batch (multiple sessions could co-exist 
at the same time)
+   *  - Complete (output the current result of the aggregation)
+   */
+  def planStreamingAggregationForSession(
+      groupingExpressions: Seq[NamedExpression],
+      sessionExpression: NamedExpression,
+      functionsWithoutDistinct: Seq[AggregateExpression],
+      resultExpressions: Seq[NamedExpression],
+      stateFormatVersion: Int,
+      mergeSessionsInLocalPartition: Boolean,
+      child: SparkPlan): Seq[SparkPlan] = {
+
+    val groupWithoutSessionExpression = groupingExpressions.filterNot { p =>
+      p.semanticEquals(sessionExpression)
+    }
+
+    if (groupWithoutSessionExpression.isEmpty) {
+      throw new AnalysisException("Global aggregation with session window in 
streaming query" +
+        " is not supported.")
+    }
+
+    val groupingWithoutSessionAttributes = 
groupWithoutSessionExpression.map(_.toAttribute)
+
+    val groupingAttributes = groupingExpressions.map(_.toAttribute)
+
+    // we don't do partial aggregate here, because it requires additional 
shuffle

Review comment:
       Yeah I think this is wrong. This was written around 2 years ago I knew 
quite less on Spark. Nice finding.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to