HeartSaVioR commented on a change in pull request #33081:
URL: https://github.com/apache/spark/pull/33081#discussion_r670208288
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala
##########
@@ -345,4 +351,179 @@ object AggUtils {
finalAndCompleteAggregate :: Nil
}
+
+ /**
+ * Plans a streaming session aggregation using the following progression:
+ *
+ * - Partial Aggregation
+ * - all tuples will have aggregated columns with initial value
+ * - (If
"spark.sql.streaming.sessionWindow.merge.sessions.in.local.partition" is
enabled)
+ * - Sort within partition (sort: all keys)
+ * - MergingSessionExec
+ * - calculate session among tuples, and aggregate tuples in session
with partial merge
+ * - Shuffle & Sort (distribution: keys "without" session, sort: all keys)
+ * - SessionWindowStateStoreRestore (group: keys "without" session)
+ * - merge input tuples with stored tuples (sessions) respecting sort
order
+ * - MergingSessionExec
+ * - calculate session among tuples, and aggregate tuples in session with
partial merge
+ * - NOTE: it leverages the fact that the output of
SessionWindowStateStoreRestore is sorted
+ * - now there is at most 1 tuple per group, key with session
+ * - SessionWindowStateStoreSave (group: keys "without" session)
+ * - saves tuple(s) for the next batch (multiple sessions could co-exist
at the same time)
+ * - Complete (output the current result of the aggregation)
+ */
+ def planStreamingAggregationForSession(
+ groupingExpressions: Seq[NamedExpression],
+ sessionExpression: NamedExpression,
+ functionsWithoutDistinct: Seq[AggregateExpression],
+ resultExpressions: Seq[NamedExpression],
+ stateFormatVersion: Int,
+ mergeSessionsInLocalPartition: Boolean,
+ child: SparkPlan): Seq[SparkPlan] = {
+
+ val groupWithoutSessionExpression = groupingExpressions.filterNot { p =>
+ p.semanticEquals(sessionExpression)
+ }
+
+ if (groupWithoutSessionExpression.isEmpty) {
+ throw new AnalysisException("Global aggregation with session window in
streaming query" +
+ " is not supported.")
+ }
+
+ val groupingWithoutSessionAttributes =
groupWithoutSessionExpression.map(_.toAttribute)
+
+ val groupingAttributes = groupingExpressions.map(_.toAttribute)
+
+ // we don't do partial aggregate here, because it requires additional
shuffle
Review comment:
Yeah I think this is wrong. This was written around 2 years ago I knew
quite less on Spark. Nice finding.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]