Github user icexelloss commented on a diff in the pull request:
https://github.com/apache/spark/pull/22305#discussion_r232393335
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
---
@@ -27,17 +27,62 @@ import
org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan,
UnaryExecNode}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples,
ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray,
SparkPlan}
import org.apache.spark.sql.execution.arrow.ArrowUtils
-import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.sql.execution.window._
+import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
+/**
+ * This class calculates and outputs windowed aggregates over the rows in
a single partition.
+ *
+ * It is very similar to [[WindowExec]] and has similar logic. The main
difference is that this
+ * node doesn't not compute any window aggregation values. Instead, it
computes the lower and
+ * upper bound for each window (i.e. window bounds) and pass the data and
indices to python work
+ * to do the actual window aggregation.
+ *
+ * It currently materialize all data associated with the same partition
key and pass them to
+ * Python. This is not strictly necessary for sliding windows and can be
improved (by slicing
+ * data into overlapping small chunks and stitch them together).
+ *
+ * This class groups window expressions by their window boundaries so that
window expressions
+ * with the same window boundaries can share the same window bounds. The
window bounds are
+ * prepended to the data passed to the python worker.
+ *
+ * For example, if we have:
+ * avg(v) over specifiedwindowframe(RowFrame, -5, 5),
+ * avg(v) over specifiedwindowframe(RowFrame, UnboundedPreceding,
UnboundedFollowing),
+ * avg(v) over specifiedwindowframe(RowFrame, -3, 3),
+ * max(v) over specifiedwindowframe(RowFrame, -3, 3)
+ *
+ * The python input will look like:
+ * (lower_bound_w1, upper_bound_w1, lower_bound_w3, upper_bound_w3, v)
+ *
+ * where w1 is specifiedwindowframe(RowFrame, -5, 5)
+ * w2 is specifiedwindowframe(RowFrame, UnboundedPreceding,
UnboundedFollowing)
+ * w3 is specifiedwindowframe(RowFrame, -3, 3)
+ *
+ * Note that w2 doesn't have bound indices in the python input because its
unbounded window
+ * so it's bound indices will always be the same.
+ *
+ * Unbounded window also have a different eval type, because:
+ * (1) It doesn't have bound indices as input
+ * (2) The udf only needs to be evaluated once the in python worker
(because the udf is
+ * deterministic and window bounds are the same for all windows)
+ *
+ * The logic to compute window bounds is delegated to
[[WindowFunctionFrame]] and shared with
+ * [[WindowExec]]
+ *
+ * Note this doesn't support partial aggregation and all aggregation is
computed from the entire
+ * window.
+ */
case class WindowInPandasExec(
windowExpression: Seq[NamedExpression],
partitionSpec: Seq[Expression],
orderSpec: Seq[SortOrder],
- child: SparkPlan) extends UnaryExecNode {
+ child: SparkPlan
+) extends WindowExecBase(windowExpression, partitionSpec, orderSpec,
child) {
--- End diff --
Done
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]