icexelloss commented on a change in pull request #22305: 
[SPARK-24561][SQL][Python] User-defined window aggregation functions with 
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r242302664
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ##########
 @@ -27,17 +27,65 @@ import 
org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, 
SparkPlan}
 import org.apache.spark.sql.execution.arrow.ArrowUtils
-import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.sql.execution.window._
+import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
+/**
+ * This class calculates and outputs windowed aggregates over the rows in a 
single partition.
+ *
+ * This is similar to [[WindowExec]]. The main difference is that this node 
does not compute
+ * any window aggregation values. Instead, it computes the lower and upper 
bound for each window
+ * (i.e. window bounds) and pass the data and indices to Python worker to do 
the actual window
+ * aggregation.
+ *
+ * It currently materializes all data associated with the same partition key 
and passes them to
+ * Python worker. This is not strictly necessary for sliding windows and can 
be improved (by
+ * possibly slicing data into overlapping chunks and stitching them together).
+ *
+ * This class groups window expressions by their window boundaries so that 
window expressions
+ * with the same window boundaries can share the same window bounds. The 
window bounds are
+ * prepended to the data passed to the python worker.
+ *
+ * For example, if we have:
+ *     avg(v) over specifiedwindowframe(RowFrame, -5, 5),
+ *     avg(v) over specifiedwindowframe(RowFrame, UnboundedPreceding, 
UnboundedFollowing),
+ *     avg(v) over specifiedwindowframe(RowFrame, -3, 3),
+ *     max(v) over specifiedwindowframe(RowFrame, -3, 3)
+ *
+ * The python input will look like:
+ * (lower_bound_w1, upper_bound_w1, lower_bound_w3, upper_bound_w3, v)
+ *
+ * where w1 is specifiedwindowframe(RowFrame, -5, 5)
+ *       w2 is specifiedwindowframe(RowFrame, UnboundedPreceding, 
UnboundedFollowing)
+ *       w3 is specifiedwindowframe(RowFrame, -3, 3)
+ *
+ * Note that w2 doesn't have bound indices in the python input because it's 
unbounded window
+ * so it's bound indices will always be the same.
+ *
+ * Bounded window and Unbounded window are evaluated differently in Python 
worker:
+ * (1) Bounded window takes the window bound indices in addition to the input 
columns.
+ *     Unbounded window takes only input columns.
+ * (2) Bounded window evaluates the udf once per input row.
+ *     Unbounded window evaluates the udf once per window partition. (Because 
the udf
+ *     is deterministic and window bounds are the same for all input rows)
 
 Review comment:
   SGTM. Removed for now.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to