Github user yhuai commented on a diff in the pull request:
https://github.com/apache/spark/pull/7057#discussion_r33641140
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -19,17 +19,39 @@ package org.apache.spark.sql.execution
import java.util
-import org.apache.spark.rdd.RDD
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical.{AllTuples,
ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.types.IntegerType
+import org.apache.spark.rdd.RDD
import org.apache.spark.util.collection.CompactBuffer
+import scala.collection.mutable
/**
* :: DeveloperApi ::
- * For every row, evaluates `windowExpression` containing Window Functions
and attaches
- * the results with other regular expressions (presented by `projectList`).
- * Evert operator handles a single Window Specification, `windowSpec`.
+ * This class calculates and outputs (windowed) aggregates over the rows
in a single sorted group.
+ * The aggregates are calculated for each row in the group. An aggregate
can take a few forms:
+ * - Global: The aggregate is calculated for the entire group. Every row
has the same value.
+ * - Rows: The aggregate is calculated based on a subset of the window,
and is unique for each
+ * row and depends on the position of the given row within the window. The
group must be sorted
+ * for this to produce sensible output. Examples are moving averages,
running sums and row
+ * numbers.
+ * - Range: The aggregate is calculated based on a subset of the window,
and is unique for each
+ * value of the order by clause and depends on its ordering. The group
must be sorted for this to
+ * produce sensible output.
+ * - Shifted: The aggregate is a displaced value relative to the position
of the given row.
+ * Examples are Lead and Lag.
--- End diff --
For this PR, one of your main targets is to optimize `Growing frame`,
right? With your optimization, we can just update the aggregation buffer and
get the evaluated results instead of creating a buffer for every row.
I see you also try to optimize `Shrinking frame` by reversing the sort
order. Then, we have to take care some functions very carefully (as you
mentioned `FIRST/LAST`). Also, since we reverse the sort order, the window
function should be commutative. My concern is that if a user implement a
user-defined window function that is commutative, he/she will not get the
correct results. I feel the right way is before we add this optimization, we
need to first have a separate task to add this kind properties to the function
definition. Then, we optimize functions that are safe to optimize. For example,
if a window function is `commutative` (let's say the `commutative` field in
this function is true), we apply this optimization. Otherwise, we do not apply
this optimization.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]