icexelloss commented on a change in pull request #22305:
[SPARK-24561][SQL][Python] User-defined window aggregation functions with
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r242302664
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
##########
@@ -27,17 +27,65 @@ import
org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan,
UnaryExecNode}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples,
ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray,
SparkPlan}
import org.apache.spark.sql.execution.arrow.ArrowUtils
-import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.sql.execution.window._
+import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
+/**
+ * This class calculates and outputs windowed aggregates over the rows in a
single partition.
+ *
+ * This is similar to [[WindowExec]]. The main difference is that this node
does not compute
+ * any window aggregation values. Instead, it computes the lower and upper
bound for each window
+ * (i.e. window bounds) and pass the data and indices to Python worker to do
the actual window
+ * aggregation.
+ *
+ * It currently materializes all data associated with the same partition key
and passes them to
+ * Python worker. This is not strictly necessary for sliding windows and can
be improved (by
+ * possibly slicing data into overlapping chunks and stitching them together).
+ *
+ * This class groups window expressions by their window boundaries so that
window expressions
+ * with the same window boundaries can share the same window bounds. The
window bounds are
+ * prepended to the data passed to the python worker.
+ *
+ * For example, if we have:
+ * avg(v) over specifiedwindowframe(RowFrame, -5, 5),
+ * avg(v) over specifiedwindowframe(RowFrame, UnboundedPreceding,
UnboundedFollowing),
+ * avg(v) over specifiedwindowframe(RowFrame, -3, 3),
+ * max(v) over specifiedwindowframe(RowFrame, -3, 3)
+ *
+ * The python input will look like:
+ * (lower_bound_w1, upper_bound_w1, lower_bound_w3, upper_bound_w3, v)
+ *
+ * where w1 is specifiedwindowframe(RowFrame, -5, 5)
+ * w2 is specifiedwindowframe(RowFrame, UnboundedPreceding,
UnboundedFollowing)
+ * w3 is specifiedwindowframe(RowFrame, -3, 3)
+ *
+ * Note that w2 doesn't have bound indices in the python input because it's
unbounded window
+ * so it's bound indices will always be the same.
+ *
+ * Bounded window and Unbounded window are evaluated differently in Python
worker:
+ * (1) Bounded window takes the window bound indices in addition to the input
columns.
+ * Unbounded window takes only input columns.
+ * (2) Bounded window evaluates the udf once per input row.
+ * Unbounded window evaluates the udf once per window partition. (Because
the udf
+ * is deterministic and window bounds are the same for all input rows)
Review comment:
SGTM. Removed for now.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]